You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by ji...@apache.org on 2022/11/09 10:25:04 UTC

[incubator-hugegraph] 01/33: implement 8 olap algorithms (#4)

This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git

commit 9e4e33f41ae7d879ede68461a41584adfdb76b0d
Author: Jermy Li <li...@baidu.com>
AuthorDate: Thu Apr 9 11:22:56 2020 +0800

    implement 8 olap algorithms (#4)
    
    * add olap algo api
    * improve source filter
    * fix louvain shaking with limit degree
    * catch exception for lpa and louvain
    * add 3 params for lpa: label,source_label,percision
    * improve louvain node store
    * remove vertices from class Community
    * move showCommunity to AbstractCommAlgorithm
    * add some parameters to AbstractAlgorithm
    * improve louvain log
    * improve clearPass and communities check
    * split louvain cache
    * fix degreeCentrality bug: degree is always < 500
    
    Change-Id: I2341b981dab44f43ac50ae0f8fa5e51b7acc1b5a
---
 .../com/baidu/hugegraph/api/job/AlgorithmAPI.java  |  84 +++
 .../java/com/baidu/hugegraph/job/AlgorithmJob.java |  71 ++
 .../hugegraph/job/algorithm/AbstractAlgorithm.java | 516 +++++++++++++++
 .../baidu/hugegraph/job/algorithm/Algorithm.java   |  35 +
 .../hugegraph/job/algorithm/AlgorithmPool.java     |  71 ++
 .../job/algorithm/CountEdgeAlgorithm.java          |  79 +++
 .../job/algorithm/CountVertexAlgorithm.java        |  79 +++
 .../job/algorithm/cent/AbstractCentAlgorithm.java  | 113 ++++
 .../cent/BetweenessCentralityAlgorithm.java        | 101 +++
 .../cent/ClosenessCentralityAlgorithm.java         | 111 ++++
 .../algorithm/cent/DegreeCentralityAlgorithm.java  | 140 ++++
 .../cent/EigenvectorCentralityAlgorithm.java       | 100 +++
 .../job/algorithm/comm/AbstractCommAlgorithm.java  |  78 +++
 .../algorithm/comm/ClusterCoeffcientAlgorithm.java |  70 ++
 .../job/algorithm/comm/LouvainAlgorithm.java       |  83 +++
 .../job/algorithm/comm/LouvainTraverser.java       | 715 +++++++++++++++++++++
 .../hugegraph/job/algorithm/comm/LpaAlgorithm.java | 263 ++++++++
 .../job/algorithm/comm/TriangleCountAlgorithm.java | 153 +++++
 18 files changed, 2862 insertions(+)

diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/AlgorithmAPI.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/AlgorithmAPI.java
new file mode 100644
index 000000000..c965e02a5
--- /dev/null
+++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/AlgorithmAPI.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.api.job;
+
+import java.util.Map;
+
+import javax.inject.Singleton;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.HugeGraph;
+import com.baidu.hugegraph.api.API;
+import com.baidu.hugegraph.api.filter.StatusFilter.Status;
+import com.baidu.hugegraph.backend.id.Id;
+import com.baidu.hugegraph.core.GraphManager;
+import com.baidu.hugegraph.job.AlgorithmJob;
+import com.baidu.hugegraph.job.JobBuilder;
+import com.baidu.hugegraph.server.RestServer;
+import com.baidu.hugegraph.util.E;
+import com.baidu.hugegraph.util.JsonUtil;
+import com.baidu.hugegraph.util.Log;
+import com.codahale.metrics.annotation.Timed;
+import com.google.common.collect.ImmutableMap;
+
+@Path("graphs/{graph}/jobs/algorithm")
+@Singleton
+public class AlgorithmAPI extends API {
+
+    private static final Logger LOG = Log.logger(RestServer.class);
+
+    @POST
+    @Timed
+    @Path("/{name}")
+    @Status(Status.CREATED)
+    @Consumes(APPLICATION_JSON)
+    @Produces(APPLICATION_JSON_WITH_CHARSET)
+    public Map<String, Id> post(@Context GraphManager manager,
+                                @PathParam("graph") String graph,
+                                @PathParam("name") String algorithm,
+                                Map<String, Object> parameters) {
+        LOG.debug("Graph [{}] schedule algorithm job: {}", graph, parameters);
+        E.checkArgument(algorithm != null && !algorithm.isEmpty(),
+                        "The algorithm name can't be empty");
+        if (parameters == null) {
+            parameters = ImmutableMap.of();
+        }
+        if (!AlgorithmJob.check(algorithm, parameters)) {
+            throw new NotFoundException("Not found algorithm: " + algorithm);
+        }
+
+        HugeGraph g = graph(manager, graph);
+        Map<String, Object> input = ImmutableMap.of("algorithm", algorithm,
+                                                    "parameters", parameters);
+        JobBuilder<Object> builder = JobBuilder.of(g);
+        builder.name("algorithm:" + algorithm)
+               .input(JsonUtil.toJson(input))
+               .job(new AlgorithmJob());
+        return ImmutableMap.of("task_id", builder.schedule().id());
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/AlgorithmJob.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/AlgorithmJob.java
new file mode 100644
index 000000000..7e752ac42
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/AlgorithmJob.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job;
+
+import java.util.Map;
+
+import com.baidu.hugegraph.job.algorithm.Algorithm;
+import com.baidu.hugegraph.job.algorithm.AlgorithmPool;
+import com.baidu.hugegraph.util.E;
+import com.baidu.hugegraph.util.JsonUtil;
+
+public class AlgorithmJob extends Job<Object> {
+
+    public static final String TASK_TYPE = "algorithm";
+
+    public static boolean check(String name, Map<String, Object> parameters) {
+        Algorithm algorithm = AlgorithmPool.instance().find(name);
+        if (algorithm == null) {
+            return false;
+        }
+        algorithm.checkParameters(parameters);
+        return true;
+    }
+
+    @Override
+    public String type() {
+        return TASK_TYPE;
+    }
+
+    @Override
+    public Object execute() throws Exception {
+        String input = this.task().input();
+        E.checkArgumentNotNull(input, "The input can't be null");
+        @SuppressWarnings("unchecked")
+        Map<String, Object> map = JsonUtil.fromJson(input, Map.class);
+
+        Object value = map.get("algorithm");
+        E.checkArgument(value instanceof String,
+                        "Invalid algorithm name '%s'", value);
+        String name = (String) value;
+
+        value = map.get("parameters");
+        E.checkArgument(value instanceof Map,
+                        "Invalid algorithm parameters '%s'", value);
+        @SuppressWarnings("unchecked")
+        Map<String, Object> parameters = (Map<String, Object>) value;
+
+        AlgorithmPool pool = AlgorithmPool.instance();
+        Algorithm algorithm = pool.find(name);
+        E.checkArgument(algorithm != null,
+                        "There is no algorithm named '%s'", name);
+        return algorithm.call(this, parameters);
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
new file mode 100644
index 000000000..660ef9f8f
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
@@ -0,0 +1,516 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm;
+
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Property;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import com.baidu.hugegraph.HugeException;
+import com.baidu.hugegraph.backend.id.Id;
+import com.baidu.hugegraph.backend.query.ConditionQuery;
+import com.baidu.hugegraph.backend.query.Query;
+import com.baidu.hugegraph.iterator.FilterIterator;
+import com.baidu.hugegraph.iterator.FlatMapperIterator;
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.testutil.Whitebox;
+import com.baidu.hugegraph.traversal.algorithm.HugeTraverser;
+import com.baidu.hugegraph.type.HugeType;
+import com.baidu.hugegraph.type.define.Directions;
+import com.baidu.hugegraph.type.define.HugeKeys;
+import com.baidu.hugegraph.util.Bytes;
+import com.baidu.hugegraph.util.E;
+import com.baidu.hugegraph.util.JsonUtil;
+
+import jersey.repackaged.com.google.common.base.Objects;
+
+@SuppressWarnings("deprecation") // StringEscapeUtils
+public abstract class AbstractAlgorithm implements Algorithm {
+
+    public static final long MAX_RESULT_SIZE = 100L * Bytes.MB;
+    public static final long MAX_QUERY_LIMIT = 10000000L; // about 10GB
+    public static final int BATCH = 500;
+
+    public static final String CATEGORY_AGGR = "aggregate";
+    public static final String CATEGORY_PATH = "path";
+    public static final String CATEGORY_RANK = "rank";
+    public static final String CATEGORY_SIMI = "similarity";
+    public static final String CATEGORY_COMM = "community";
+    public static final String CATEGORY_CENT = "centrality";
+
+    public static final String KEY_DIRECTION = "direction";
+    public static final String KEY_LABEL = "label";
+    public static final String KEY_DEPTH = "depth";
+    public static final String KEY_DEGREE = "degree";
+    public static final String KEY_SAMPLE = "sample";
+    public static final String KEY_SOURCE_SAMPLE = "source_sample";
+    public static final String KEY_SOURCE_LABEL = "source_label";
+    public static final String KEY_SOURCE_CLABEL = "source_clabel";
+    public static final String KEY_TOP = "top";
+    public static final String KEY_TIMES = "times";
+    public static final String KEY_STABLE_TIMES = "stable_times";
+    public static final String KEY_PRECISION = "precision";
+    public static final String KEY_SHOW_COMM = "show_community";
+    public static final String KEY_CLEAR = "clear";
+    public static final String KEY_CAPACITY = "capacity";
+    public static final String KEY_LIMIT = "limit";
+
+    public static final long DEFAULT_CAPACITY = 10000000L;
+    public static final long DEFAULT_LIMIT = 100L;
+    public static final long DEFAULT_DEGREE = 100L;
+    public static final long DEFAULT_SAMPLE = 1L;
+    public static final long DEFAULT_TIMES = 20L;
+    public static final long DEFAULT_STABLE_TIMES= 3L;
+    public static final double DEFAULT_PRECISION = 1.0 / 1000;
+
+    @Override
+    public void checkParameters(Map<String, Object> parameters) {
+        E.checkArgument(parameters.isEmpty(),
+                        "Unnecessary parameters: %s", parameters);
+    }
+
+    protected static int depth(Map<String, Object> parameters) {
+        int depth = parameterInt(parameters, KEY_DEPTH);
+        E.checkArgument(depth > 0,
+                        "The value of %s must be > 0, but got %s",
+                        KEY_DEPTH, depth);
+        return depth;
+    }
+
+    protected static String edgeLabel(Map<String, Object> parameters) {
+        if (!parameters.containsKey(KEY_LABEL)) {
+            return null;
+        }
+        return parameterString(parameters, KEY_LABEL);
+    }
+
+    protected static Directions direction(Map<String, Object> parameters) {
+        Object direction = parameter(parameters, KEY_DIRECTION);
+        return parseDirection(direction);
+    }
+
+    protected static long top(Map<String, Object> parameters) {
+        if (!parameters.containsKey(KEY_TOP)) {
+            return 0L;
+        }
+        long top = parameterLong(parameters, KEY_TOP);
+        E.checkArgument(top >= 0L,
+                        "The value of %s must be >= 0, but got %s",
+                        KEY_TOP, top);
+        return top;
+    }
+
+    protected static long degree(Map<String, Object> parameters) {
+        if (!parameters.containsKey(KEY_DEGREE)) {
+            return DEFAULT_DEGREE;
+        }
+        long degree = parameterLong(parameters, KEY_DEGREE);
+        HugeTraverser.checkDegree(degree);
+        return degree;
+    }
+
+    protected static long capacity(Map<String, Object> parameters) {
+        if (!parameters.containsKey(KEY_CAPACITY)) {
+            return DEFAULT_CAPACITY;
+        }
+        long capacity = parameterLong(parameters, KEY_CAPACITY);
+        HugeTraverser.checkCapacity(capacity);
+        return capacity;
+    }
+
+    protected static long limit(Map<String, Object> parameters) {
+        if (!parameters.containsKey(KEY_LIMIT)) {
+            return DEFAULT_LIMIT;
+        }
+        long limit = parameterLong(parameters, KEY_LIMIT);
+        HugeTraverser.checkLimit(limit);
+        return limit;
+    }
+
+    protected static long sample(Map<String, Object> parameters) {
+        if (!parameters.containsKey(KEY_SAMPLE)) {
+            return DEFAULT_SAMPLE;
+        }
+        long sample = parameterLong(parameters, KEY_SAMPLE);
+        HugeTraverser.checkPositiveOrNoLimit(sample, KEY_SAMPLE);
+        return sample;
+    }
+
+    protected static long sourceSample(Map<String, Object> parameters) {
+        if (!parameters.containsKey(KEY_SOURCE_SAMPLE)) {
+            return HugeTraverser.NO_LIMIT;
+        }
+        long sample = parameterLong(parameters, KEY_SOURCE_SAMPLE);
+        HugeTraverser.checkPositiveOrNoLimit(sample, KEY_SOURCE_SAMPLE);
+        return sample;
+    }
+
+    protected static String sourceLabel(Map<String, Object> parameters) {
+        if (!parameters.containsKey(KEY_SOURCE_LABEL)) {
+            return null;
+        }
+        return parameterString(parameters, KEY_SOURCE_LABEL);
+    }
+
+    protected static String sourceCLabel(Map<String, Object> parameters) {
+        if (!parameters.containsKey(KEY_SOURCE_CLABEL)) {
+            return null;
+        }
+        return parameterString(parameters, KEY_SOURCE_CLABEL);
+    }
+
+    public static Object parameter(Map<String, Object> parameters, String key) {
+        Object value = parameters.get(key);
+        E.checkArgument(value != null,
+                        "Expect '%s' in parameters: %s",
+                        key, parameters);
+        return value;
+    }
+
+    public static String parameterString(Map<String, Object> parameters,
+                                         String key) {
+        Object value = parameter(parameters, key);
+        E.checkArgument(value instanceof String,
+                        "Expect string value for parameter '%s': '%s'",
+                        key, value);
+        return (String) value;
+    }
+
+    public static int parameterInt(Map<String, Object> parameters,
+                                   String key) {
+        Object value = parameter(parameters, key);
+        E.checkArgument(value instanceof Number,
+                        "Expect int value for parameter '%s': '%s'",
+                        key, value);
+        return ((Number) value).intValue();
+    }
+
+    public static long parameterLong(Map<String, Object> parameters,
+                                     String key) {
+        Object value = parameter(parameters, key);
+        E.checkArgument(value instanceof Number,
+                        "Expect long value for parameter '%s': '%s'",
+                        key, value);
+        return ((Number) value).longValue();
+    }
+
+    public static double parameterDouble(Map<String, Object> parameters,
+                                         String key) {
+        Object value = parameter(parameters, key);
+        E.checkArgument(value instanceof Number,
+                        "Expect double value for parameter '%s': '%s'",
+                        key, value);
+        return ((Number) value).doubleValue();
+    }
+
+    public static boolean parameterBoolean(Map<String, Object> parameters,
+                                           String key) {
+        Object value = parameter(parameters, key);
+        E.checkArgument(value instanceof Boolean,
+                        "Expect boolean value for parameter '%s': '%s'",
+                        key, value);
+        return ((Boolean) value);
+    }
+
+    public static Directions parseDirection(Object direction) {
+        if (direction.equals(Directions.BOTH.toString())) {
+            return Directions.BOTH;
+        } else if (direction.equals(Directions.OUT.toString())) {
+            return Directions.OUT;
+        } else if (direction.equals(Directions.IN.toString())) {
+            return Directions.IN;
+        } else {
+            throw new IllegalArgumentException(String.format(
+                      "The value of direction must be in [OUT, IN, BOTH], " +
+                      "but got '%s'", direction));
+        }
+    }
+
+    public static class AlgoTraverser extends HugeTraverser {
+
+        private final Job<Object> job;
+        protected long progress;
+
+        public AlgoTraverser(Job<Object> job) {
+            super(job.graph());
+            this.job = job;
+        }
+
+        public void updateProgress(long progress) {
+            this.job.updateProgress((int) progress);
+        }
+
+        protected Iterator<Vertex> vertices() {
+            return this.vertices(Query.NO_LIMIT);
+        }
+
+        protected Iterator<Vertex> vertices(long limit) {
+            Query query = new Query(HugeType.VERTEX);
+            query.capacity(Query.NO_CAPACITY);
+            query.limit(limit);
+            return this.graph().vertices(query);
+        }
+
+        protected Iterator<Vertex> vertices(Object label, String key,
+                                            Object value, long limit) {
+            Iterator<Vertex> vertices = this.vertices(label, limit);
+            if (key != null) {
+                vertices = filter(vertices, key, value);
+            }
+           return vertices;
+        }
+
+        protected Iterator<Vertex> vertices(Object label, long limit) {
+            if (label == null) {
+                return this.vertices(limit);
+            }
+            ConditionQuery query = new ConditionQuery(HugeType.VERTEX);
+            query.capacity(Query.NO_CAPACITY);
+            query.limit(limit);
+            if (label != null) {
+                query.eq(HugeKeys.LABEL, this.getVertexLabelId(label));
+            }
+            return this.graph().vertices(query);
+        }
+
+        protected Iterator<Vertex> vertices(Iterator<Object> ids) {
+            return new FlatMapperIterator<>(ids, id -> {
+                return this.graph().vertices(id);
+            });
+        }
+
+        protected Iterator<Vertex> filter(Iterator<Vertex> vertices,
+                                          String key, Object value) {
+            return new FilterIterator<>(vertices, vertex -> {
+                boolean matched = match(vertex, key, value);
+                if (!matched) {
+                    this.updateProgress(++this.progress);
+                }
+                return matched;
+            });
+        }
+
+        protected static boolean match(Element elem, String key, Object value) {
+            Property<Object> p = elem.property(key);
+            return p.isPresent() && Objects.equal(p.value(), value);
+        }
+
+        protected Iterator<Edge> edges(Directions dir) {
+            HugeType type = dir == null ? HugeType.EDGE : dir.type();
+            Query query = new Query(type);
+            query.capacity(Query.NO_CAPACITY);
+            query.limit(Query.NO_LIMIT);
+            return this.graph().edges(query);
+        }
+
+        protected void drop(GraphTraversal<?, ? extends Element> traversal) {
+            this.execute(traversal, () -> {
+                while (traversal.hasNext()) {
+                    this.updateProgress(++this.progress);
+                    traversal.next().remove();
+                    this.commitIfNeeded();
+                }
+                return null;
+            });
+            this.graph().tx().commit();
+        }
+
+        protected <V> V execute(GraphTraversal<?, ?> traversal,
+                                Callable<V> callback) {
+            long capacity = Query.defaultCapacity(MAX_QUERY_LIMIT);
+            try {
+                return callback.call();
+            } catch (Exception e) {
+                throw new HugeException("Failed to execute algorithm", e);
+            } finally {
+                Query.defaultCapacity(capacity);
+                try {
+                    traversal.close();
+                } catch (Exception e) {
+                    throw new HugeException("Can't close traversal", e);
+                }
+            }
+        }
+
+        protected void commitIfNeeded() {
+            // commit if needed
+            Transaction tx = this.graph().tx();
+            Whitebox.invoke(tx.getClass(), "commitIfGtSize", tx, BATCH);
+        }
+    }
+
+    public static final class TopMap {
+
+        private final long topN;
+        private Map<Id, MutableLong> tops;
+
+        public TopMap(long topN) {
+            this.topN = topN;
+            this.tops = new HashMap<>();
+        }
+
+        public int size() {
+            return this.tops.size();
+        }
+
+        public void put(Id key, long value) {
+            this.put(key, Long.valueOf(value));
+        }
+
+        public void put(Id key, Long value) {
+            this.tops.put(key, new MutableLong(value));
+            // keep 2x buffer
+            if (this.tops.size() > this.topN * 2) {
+                this.shrinkIfNeeded(this.topN);
+            }
+        }
+
+        public Set<Map.Entry<Id, MutableLong>> entrySet() {
+            this.shrinkIfNeeded(this.topN);
+            return this.tops.entrySet();
+        }
+
+        private void shrinkIfNeeded(long limit) {
+            if (this.tops.size() >= limit && limit != HugeTraverser.NO_LIMIT) {
+                this.tops = HugeTraverser.topN(this.tops, true, limit);
+            }
+        }
+    }
+
+    public static final class JsonMap {
+
+        private final StringBuilder json;
+
+        public JsonMap() {
+            this(4 * (int) Bytes.KB);
+        }
+
+        public JsonMap(int initCapaticy) {
+            this.json = new StringBuilder(initCapaticy);
+        }
+
+        public void startObject() {
+            this.json.append('{');
+        }
+
+        public void endObject() {
+            this.deleteLastComma();
+            this.json.append('}');
+        }
+
+        public void startList() {
+            this.json.append('[');
+        }
+
+        public void endList() {
+            this.deleteLastComma();
+            this.json.append(']');
+        }
+
+        public void deleteLastComma() {
+            int last = this.json.length() - 1;
+            if (last >= 0 && this.json.charAt(last) == ',') {
+                this.json.deleteCharAt(last);
+            }
+        }
+
+        public void appendKey(String key) {
+            this.appendString(key).append(':');
+        }
+
+        public void append(long value) {
+            this.json.append(value).append(',');
+            this.checkSizeLimit();
+        }
+
+        public void append(String value) {
+            this.appendString(value).append(',');
+            this.checkSizeLimit();
+        }
+
+        public void append(Object key, long value) {
+            this.append(key.toString(), value);
+        }
+
+        public void append(String key, long value) {
+            this.appendString(key).append(':');
+            this.json.append(value).append(',');
+            this.checkSizeLimit();
+        }
+
+        public void append(Object key, Number value) {
+            this.append(key.toString(), value);
+        }
+
+        public void append(String key, Number value) {
+            this.appendString(key).append(':');
+            this.json.append(value).append(',');
+            this.checkSizeLimit();
+        }
+
+        public void append(String key, String value) {
+            this.appendString(key).append(':');
+            this.appendString(value).append(',');
+            this.checkSizeLimit();
+        }
+
+        public void appendRaw(String key, String rawJson) {
+            this.appendString(key).append(':');
+            this.json.append(rawJson).append(',');
+            this.checkSizeLimit();
+        }
+
+        public void append(Set<Entry<Id, MutableLong>> kvs) {
+            for (Map.Entry<Id, MutableLong> top : kvs) {
+                this.append(top.getKey(), top.getValue());
+            }
+        }
+
+        private StringBuilder appendString(String str) {
+            if (str.indexOf('"') >= 0) {
+                str = StringEscapeUtils.escapeJson(str);
+            }
+            return this.json.append('"').append(str).append('"');
+        }
+
+        public void checkSizeLimit() {
+            E.checkArgument(this.json.length() < MAX_RESULT_SIZE,
+                            "The result size exceeds limit %s",
+                            MAX_RESULT_SIZE);
+        }
+
+        public Object asJson() {
+            return JsonUtil.asJson(this.json.toString());
+        }
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Algorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Algorithm.java
new file mode 100644
index 000000000..6ad200157
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Algorithm.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm;
+
+import java.util.Map;
+
+import com.baidu.hugegraph.job.Job;
+
+public interface Algorithm {
+
+    public String name();
+
+    public String category();
+
+    public Object call(Job<Object> job, Map<String, Object> parameters);
+
+    public void checkParameters(Map<String, Object> parameters);
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AlgorithmPool.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AlgorithmPool.java
new file mode 100644
index 000000000..98f7c89dc
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AlgorithmPool.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.baidu.hugegraph.job.algorithm.cent.BetweenessCentralityAlgorithm;
+import com.baidu.hugegraph.job.algorithm.cent.ClosenessCentralityAlgorithm;
+import com.baidu.hugegraph.job.algorithm.cent.DegreeCentralityAlgorithm;
+import com.baidu.hugegraph.job.algorithm.cent.EigenvectorCentralityAlgorithm;
+import com.baidu.hugegraph.job.algorithm.comm.ClusterCoeffcientAlgorithm;
+import com.baidu.hugegraph.job.algorithm.comm.LouvainAlgorithm;
+import com.baidu.hugegraph.job.algorithm.comm.LpaAlgorithm;
+import com.baidu.hugegraph.job.algorithm.comm.TriangleCountAlgorithm;
+
+public class AlgorithmPool {
+
+    private static final AlgorithmPool INSTANCE = new AlgorithmPool();
+
+    static {
+        INSTANCE.register(new CountVertexAlgorithm());
+        INSTANCE.register(new CountEdgeAlgorithm());
+
+        INSTANCE.register(new DegreeCentralityAlgorithm());
+        INSTANCE.register(new BetweenessCentralityAlgorithm());
+        INSTANCE.register(new ClosenessCentralityAlgorithm());
+        INSTANCE.register(new EigenvectorCentralityAlgorithm());
+
+        INSTANCE.register(new TriangleCountAlgorithm());
+        INSTANCE.register(new ClusterCoeffcientAlgorithm());
+        INSTANCE.register(new LpaAlgorithm());
+        INSTANCE.register(new LouvainAlgorithm());
+    }
+
+    private final Map<String, Algorithm> algorithms;
+
+    public AlgorithmPool() {
+        this.algorithms = new ConcurrentHashMap<>();
+    }
+
+    public Algorithm register(Algorithm algo) {
+        assert !this.algorithms.containsKey(algo.name());
+        return this.algorithms.put(algo.name(), algo);
+    }
+
+    public Algorithm find(String name) {
+        return this.algorithms.get(name);
+    }
+
+    public static AlgorithmPool instance() {
+        return INSTANCE;
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountEdgeAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountEdgeAlgorithm.java
new file mode 100644
index 000000000..9fb122348
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountEdgeAlgorithm.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.util.JsonUtil;
+
+public class CountEdgeAlgorithm extends AbstractAlgorithm {
+
+    @Override
+    public String name() {
+        return "count_edge";
+    }
+
+    @Override
+    public String category() {
+        return CATEGORY_AGGR;
+    }
+
+    @Override
+    public Object call(Job<Object> job, Map<String, Object> parameters) {
+        Traverser traverser = new Traverser(job);
+        return traverser.count();
+    }
+
+    private static class Traverser extends AlgoTraverser {
+
+        public Traverser(Job<Object> job) {
+            super(job);
+        }
+
+        public Object count() {
+            Iterator<Edge> edges = this.edges(null);
+
+            Map<String, MutableLong> counts = new HashMap<>();
+            long total = 0L;
+
+            while (edges.hasNext()) {
+                Edge edge = edges.next();
+                String label = edge.label();
+                MutableLong count = counts.get(label);
+                if (count != null) {
+                    count.increment();
+                } else {
+                    counts.put(label, new MutableLong(1L));
+                }
+                total++;
+                this.updateProgress(total);
+            }
+            counts.put("*", new MutableLong(total));
+
+            return JsonUtil.asJson(counts);
+        }
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountVertexAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountVertexAlgorithm.java
new file mode 100644
index 000000000..582e0bb69
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountVertexAlgorithm.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.util.JsonUtil;
+
+public class CountVertexAlgorithm extends AbstractAlgorithm {
+
+    @Override
+    public String name() {
+        return "count_vertex";
+    }
+
+    @Override
+    public String category() {
+        return CATEGORY_AGGR;
+    }
+
+    @Override
+    public Object call(Job<Object> job, Map<String, Object> parameters) {
+        Traverser traverser = new Traverser(job);
+        return traverser.count();
+    }
+
+    private static class Traverser extends AlgoTraverser {
+
+        public Traverser(Job<Object> job) {
+            super(job);
+        }
+
+        public Object count() {
+            Iterator<Vertex> vertices = this.vertices();
+
+            Map<String, MutableLong> counts = new HashMap<>();
+            long total = 0L;
+
+            while (vertices.hasNext()) {
+                Vertex vertex = vertices.next();
+                String label = vertex.label();
+                MutableLong count = counts.get(label);
+                if (count != null) {
+                    count.increment();
+                } else {
+                    counts.put(label, new MutableLong(1L));
+                }
+                total++;
+                this.updateProgress(total);
+            }
+            counts.put("*", new MutableLong(total));
+
+            return JsonUtil.asJson(counts);
+        }
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java
new file mode 100644
index 000000000..14841043a
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.cent;
+
+import java.util.Map;
+
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm;
+import com.baidu.hugegraph.job.algorithm.comm.LpaAlgorithm;
+
+public abstract class AbstractCentAlgorithm extends AbstractAlgorithm {
+
+    protected static final String C_LABEL = LpaAlgorithm.Traverser.C_LABEL;
+
+    @Override
+    public String category() {
+        return CATEGORY_CENT;
+    }
+
+    @Override
+    public void checkParameters(Map<String, Object> parameters) {
+        depth(parameters);
+        degree(parameters);
+        sample(parameters);
+        sourceSample(parameters);
+        sourceLabel(parameters);
+        sourceCLabel(parameters);
+        top(parameters);
+    }
+
+    public static class Traverser extends AlgoTraverser {
+
+        public Traverser(Job<Object> job) {
+            super(job);
+        }
+
+        protected GraphTraversal<Vertex, Vertex> constructSource(
+                                                 String sourceLabel,
+                                                 long sourceSample,
+                                                 String sourceCLabel) {
+            GraphTraversal<Vertex, Vertex> t = this.graph().traversal()
+                                                           .withSack(1f).V();
+
+            if (sourceLabel != null) {
+                t = t.hasLabel(sourceLabel);
+            }
+
+            t = t.filter(it -> {
+                this.updateProgress(++this.progress);
+                return sourceCLabel == null ? true :
+                       match(it.get(), C_LABEL, sourceCLabel);
+            });
+
+            if (sourceSample > 0L) {
+                t = t.sample((int) sourceSample);
+            }
+
+            return t;
+        }
+
+        protected GraphTraversal<Vertex, Vertex> constructPath(
+                  GraphTraversal<Vertex, Vertex> t, long degree,
+                  long sample, String sourceLabel, String sourceCLabel) {
+            GraphTraversal<?, Vertex> unit = constructPathUnit(degree, sample,
+                                                               sourceLabel,
+                                                               sourceCLabel);
+            t = t.as("v").repeat(__.local(unit).simplePath().as("v"));
+
+            return t;
+        }
+
+        protected GraphTraversal<Vertex, Vertex> constructPathUnit(
+                                                 long degree, long sample,
+                                                 String sourceLabel,
+                                                 String sourceCLabel) {
+            GraphTraversal<Vertex, Vertex> unit = __.both();
+            if (sourceLabel != null) {
+                unit = unit.hasLabel(sourceLabel);
+            }
+            if (sourceCLabel != null) {
+                unit = unit.has(C_LABEL, sourceCLabel);
+            }
+            if (degree != NO_LIMIT) {
+                unit = unit.limit(degree);
+            }
+            if (sample > 0L) {
+                unit = unit.sample((int) sample);
+            }
+            return unit;
+        }
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java
new file mode 100644
index 000000000..ae1b8bb74
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.cent;
+
+import java.util.Map;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Order;
+import org.apache.tinkerpop.gremlin.process.traversal.P;
+import org.apache.tinkerpop.gremlin.process.traversal.Pop;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.structure.Column;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import com.baidu.hugegraph.job.Job;
+
+public class BetweenessCentralityAlgorithm extends AbstractCentAlgorithm {
+
+    @Override
+    public String name() {
+        return "betweeness_centrality";
+    }
+
+    @Override
+    public Object call(Job<Object> job, Map<String, Object> parameters) {
+        Traverser traverser = new Traverser(job);
+        return traverser.betweenessCentrality(depth(parameters),
+                                              degree(parameters),
+                                              sample(parameters),
+                                              sourceLabel(parameters),
+                                              sourceSample(parameters),
+                                              sourceCLabel(parameters),
+                                              top(parameters));
+    }
+
+    private static class Traverser extends AbstractCentAlgorithm.Traverser {
+
+        public Traverser(Job<Object> job) {
+            super(job);
+        }
+
+        public Object betweenessCentrality(int depth,
+                                           long degree,
+                                           long sample,
+                                           String sourceLabel,
+                                           long sourceSample,
+                                           String sourceCLabel,
+                                           long topN) {
+            assert depth > 0;
+            assert degree > 0L;
+            assert topN >= 0L;
+
+            GraphTraversal<Vertex, Vertex> t = constructSource(sourceLabel,
+                                                               sourceSample,
+                                                               sourceCLabel);
+            t = constructPath(t, degree, sample, sourceLabel, sourceCLabel);
+            t = t.emit().until(__.loops().is(P.gte(depth)));
+
+            @SuppressWarnings({ "unchecked", "deprecation" })
+            GraphTraversal<Vertex, Vertex> tf = t.filter(
+                    __.project("x","y","z")
+                           .by(__.select(Pop.first, "v").id())
+                           .by(__.select(Pop.last, "v").id())
+                           .by(__.select(Pop.all, "v").count(Scope.local))
+                      .as("triple")
+                      .coalesce(__.select("x","y").as("a")
+                                  .select("triples").unfold().as("t")
+                                  .select("x","y").where(P.eq("a")).select("t"),
+                                __.store("triples"))
+                      .select("z").as("length")
+                      .select("triple").select("z").where(P.eq("length")));
+
+            GraphTraversal<Vertex, ?> tg = tf.select(Pop.all, "v")
+                                             .unfold().id()
+                                             .groupCount().order(Scope.local)
+                                             .by(Column.values, Order.desc);
+            GraphTraversal<Vertex, ?> tLimit = topN <= 0L ? tg :
+                                               tg.limit(Scope.local, topN);
+
+            return this.execute(tLimit, () -> tLimit.next());
+        }
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java
new file mode 100644
index 000000000..d890db808
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.cent;
+
+import java.util.Map;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Operator;
+import org.apache.tinkerpop.gremlin.process.traversal.Order;
+import org.apache.tinkerpop.gremlin.process.traversal.P;
+import org.apache.tinkerpop.gremlin.process.traversal.Pop;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.structure.Column;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import com.baidu.hugegraph.job.Job;
+
+public class ClosenessCentralityAlgorithm extends AbstractCentAlgorithm {
+
+    public static final long DEFAULT_DEGREE = 100L;
+    public static final long DEFAULT_SAMPLE = 1L;
+
+    @Override
+    public String name() {
+        return "closeness_centrality";
+    }
+
+    @Override
+    public void checkParameters(Map<String, Object> parameters) {
+        depth(parameters);
+    }
+
+    @Override
+    public Object call(Job<Object> job, Map<String, Object> parameters) {
+        Traverser traverser = new Traverser(job);
+        return traverser.closenessCentrality(depth(parameters),
+                                             degree(parameters),
+                                             sample(parameters),
+                                             sourceLabel(parameters),
+                                             sourceSample(parameters),
+                                             sourceCLabel(parameters),
+                                             top(parameters));
+    }
+
+    private static class Traverser extends AbstractCentAlgorithm.Traverser {
+
+        public Traverser(Job<Object> job) {
+            super(job);
+        }
+
+        public Object closenessCentrality(int depth,
+                                          long degree,
+                                          long sample,
+                                          String sourceLabel,
+                                          long sourceSample,
+                                          String sourceCLabel,
+                                          long topN) {
+            assert depth > 0;
+            assert degree > 0L;
+            assert topN >= 0L;
+
+            GraphTraversal<Vertex, Vertex> t = constructSource(sourceLabel,
+                                                               sourceSample,
+                                                               sourceCLabel);
+            t = constructPath(t, degree, sample, sourceLabel, sourceCLabel);
+            t = t.emit().until(__.loops().is(P.gte(depth)));
+
+            @SuppressWarnings({ "unchecked", "deprecation" })
+            GraphTraversal<Vertex, Vertex> tf = t.filter(
+                    __.project("x","y","z")
+                           .by(__.select(Pop.first, "v").id())
+                           .by(__.select(Pop.last, "v").id())
+                           .by(__.select(Pop.all, "v").count(Scope.local))
+                      .as("triple")
+                      .coalesce(__.select("x","y").as("a")
+                                  .select("triples").unfold().as("t")
+                                  .select("x","y").where(P.eq("a")).select("t"),
+                                __.store("triples"))
+                      .select("z").as("length")
+                      .select("triple").select("z").where(P.eq("length")));
+
+            GraphTraversal<Vertex, ?> tg;
+            tg = tf.group().by(__.select(Pop.first, "v").id())
+                           .by(__.select(Pop.all, "v").count(Scope.local)
+                                 .sack(Operator.div).sack().sum())
+                           .order(Scope.local).by(Column.values, Order.desc);
+            GraphTraversal<Vertex, ?> tLimit = topN <= 0L ? tg :
+                                               tg.limit(Scope.local, topN);
+
+            return this.execute(tLimit, () -> tLimit.next());
+        }
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java
new file mode 100644
index 000000000..81bd33672
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.cent;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import com.baidu.hugegraph.backend.id.Id;
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.structure.HugeEdge;
+import com.baidu.hugegraph.type.define.Directions;
+
+public class DegreeCentralityAlgorithm extends AbstractCentAlgorithm {
+
+    @Override
+    public String name() {
+        return "degree_centrality";
+    }
+
+    @Override
+    public void checkParameters(Map<String, Object> parameters) {
+        direction(parameters);
+        top(parameters);
+    }
+
+    @Override
+    public Object call(Job<Object> job, Map<String, Object> parameters) {
+        Traverser traverser = new Traverser(job);
+        return traverser.degreeCentrality(direction(parameters),
+                                          top(parameters));
+    }
+
+    private static class Traverser extends AlgoTraverser {
+
+        public Traverser(Job<Object> job) {
+            super(job);
+        }
+
+        public Object degreeCentrality(Directions direction, long topN) {
+            if (direction == null || direction == Directions.BOTH) {
+                return degreeCentrality(topN);
+            }
+            assert direction == Directions.OUT || direction == Directions.IN;
+            assert topN >= 0L;
+
+            Iterator<Edge> edges = this.edges(direction);
+
+            JsonMap degrees = new JsonMap();
+            TopMap tops = new TopMap(topN);
+            Id vertex = null;
+            long degree = 0L;
+            long total = 0L;
+
+            degrees.startObject();
+            while (edges.hasNext()) {
+                HugeEdge edge = (HugeEdge) edges.next();
+                this.updateProgress(++total);
+
+                Id source = edge.ownerVertex().id();
+                if (source.equals(vertex)) {
+                    degree++;
+                    continue;
+                }
+                if (vertex != null) {
+                    if (topN <= 0L) {
+                        degrees.append(vertex, degree);
+                    } else {
+                        tops.put(vertex, degree);
+                    }
+                }
+                vertex = source;
+                degree = 1L;
+            }
+
+            if (vertex != null) {
+                if (topN <= 0L) {
+                    degrees.append(vertex, degree);
+                } else {
+                    tops.put(vertex, degree);
+                    degrees.append(tops.entrySet());
+                }
+            }
+
+            degrees.endObject();
+
+            return degrees.asJson();
+        }
+
+        protected Object degreeCentrality(long topN) {
+            assert topN >= 0L;
+            long total = 0L;
+            JsonMap degrees = new JsonMap();
+            TopMap tops = new TopMap(topN);
+
+            GraphTraversalSource traversal = this.graph().traversal();
+            Iterator<Vertex> vertices = this.vertices();
+
+            degrees.startObject();
+            while (vertices.hasNext()) {
+                Vertex source = vertices.next();
+                this.updateProgress(++total);
+
+                Long degree = traversal.V(source).bothE().count().next();
+                if (topN <= 0L) {
+                    degrees.append(source.id(), degree);
+                } else {
+                    tops.put((Id) source.id(), degree);
+                }
+            }
+
+            if (tops.size() > 0) {
+                degrees.append(tops.entrySet());
+            }
+            degrees.endObject();
+
+            return degrees.asJson();
+        }
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java
new file mode 100644
index 000000000..d87fc7931
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.cent;
+
+import java.util.Map;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Order;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.structure.Column;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import com.baidu.hugegraph.job.Job;
+
+public class EigenvectorCentralityAlgorithm extends AbstractCentAlgorithm {
+
+    public static final long DEFAULT_DEGREE = 100L;
+    public static final long DEFAULT_SAMPLE = 1L;
+
+    @Override
+    public String name() {
+        return "eigenvector_centrality";
+    }
+
+    @Override
+    public Object call(Job<Object> job, Map<String, Object> parameters) {
+        Traverser traverser = new Traverser(job);
+        return traverser.eigenvectorCentrality(depth(parameters),
+                                               degree(parameters),
+                                               sample(parameters),
+                                               sourceLabel(parameters),
+                                               sourceSample(parameters),
+                                               sourceCLabel(parameters),
+                                               top(parameters));
+    }
+
+    private static class Traverser extends AbstractCentAlgorithm.Traverser {
+
+        public Traverser(Job<Object> job) {
+            super(job);
+        }
+
+        public Object eigenvectorCentrality(int depth,
+                                            long degree,
+                                            long sample,
+                                            String sourceLabel,
+                                            long sourceSample,
+                                            String sourceCLabel,
+                                            long topN) {
+            assert depth > 0;
+            assert degree > 0L;
+            assert topN >= 0L;
+
+            // TODO: support parameters: Directions dir, String label
+            /*
+             * g.V().repeat(groupCount('m').by(id)
+             *              .local(both().limit(50).sample(1))
+             *              .simplePath())
+             *      .times(4).cap('m')
+             *      .order(local).by(values, desc)
+             *      .limit(local, 100)
+             */
+
+            GraphTraversal<Vertex, Vertex> t = constructSource(sourceLabel,
+                                                               sourceSample,
+                                                               sourceCLabel);
+            GraphTraversal<?, Vertex> unit = constructPathUnit(degree, sample,
+                                                               sourceLabel,
+                                                               sourceCLabel);
+            t = t.repeat(__.groupCount("m").by(T.id)
+                           .local(unit).simplePath()).times(depth);
+
+            GraphTraversal<Vertex, Object> tCap;
+            tCap = t.cap("m").order(Scope.local).by(Column.values, Order.desc);
+            GraphTraversal<Vertex, ?> tLimit = topN <= 0L ? tCap :
+                                               tCap.limit(Scope.local, topN);
+
+            return this.execute(tLimit, () -> tLimit.next());
+        }
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/AbstractCommAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/AbstractCommAlgorithm.java
new file mode 100644
index 000000000..74b884a06
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/AbstractCommAlgorithm.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.comm;
+
+import java.util.Map;
+
+import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm;
+import com.baidu.hugegraph.traversal.algorithm.HugeTraverser;
+import com.baidu.hugegraph.util.E;
+
+public abstract class AbstractCommAlgorithm extends AbstractAlgorithm {
+
+    private static final int MAX_TIMES = 2048;
+
+    @Override
+    public String category() {
+        return CATEGORY_COMM;
+    }
+
+    protected static int times(Map<String, Object> parameters) {
+        if (!parameters.containsKey(KEY_TIMES)) {
+            return (int) DEFAULT_TIMES;
+        }
+        int times = parameterInt(parameters, KEY_TIMES);
+        HugeTraverser.checkPositiveOrNoLimit(times, KEY_TIMES);
+        E.checkArgument(times <= MAX_TIMES,
+                        "The maximum number of iterations is %s, but got %s",
+                        MAX_TIMES, times);
+        return times;
+    }
+
+    protected static int stableTimes(Map<String, Object> parameters) {
+        if (!parameters.containsKey(KEY_STABLE_TIMES)) {
+            return (int) DEFAULT_STABLE_TIMES;
+        }
+        int times = parameterInt(parameters, KEY_STABLE_TIMES);
+        HugeTraverser.checkPositiveOrNoLimit(times, KEY_STABLE_TIMES);
+        E.checkArgument(times <= MAX_TIMES,
+                        "The maximum number of stable iterations is %s, " +
+                        "but got %s", MAX_TIMES, times);
+        return times;
+    }
+
+    protected static double precision(Map<String, Object> parameters) {
+        if (!parameters.containsKey(KEY_PRECISION)) {
+            return DEFAULT_PRECISION;
+        }
+        double precision = parameterDouble(parameters, KEY_PRECISION);
+        E.checkArgument(0d < precision && precision < 1d,
+                        "The %s parameter must be in range(0,1), but got: %s",
+                        KEY_PRECISION, precision);
+        return precision;
+    }
+
+    protected static String showCommunity(Map<String, Object> parameters) {
+        if (!parameters.containsKey(KEY_SHOW_COMM)) {
+            return null;
+        }
+        return parameterString(parameters, KEY_SHOW_COMM);
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java
new file mode 100644
index 000000000..cc893fc1f
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.comm;
+
+import java.util.Map;
+
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.type.define.Directions;
+import com.baidu.hugegraph.util.InsertionOrderUtil;
+
+public class ClusterCoeffcientAlgorithm extends AbstractCommAlgorithm {
+
+    @Override
+    public String name() {
+        return "cluster_coeffcient";
+    }
+
+    @Override
+    public void checkParameters(Map<String, Object> parameters) {
+        direction(parameters);
+        degree(parameters);
+    }
+
+    @Override
+    public Object call(Job<Object> job, Map<String, Object> parameters) {
+        Traverser traverser = new Traverser(job);
+        return traverser.clusterCoeffcient(direction(parameters),
+                                           degree(parameters));
+    }
+
+    private static class Traverser extends TriangleCountAlgorithm.Traverser {
+
+        public Traverser(Job<Object> job) {
+            super(job);
+        }
+
+        public Object clusterCoeffcient(Directions direction, long degree) {
+            Map<String, Long> results = this.triangles(direction, degree);
+            results = InsertionOrderUtil.newMap(results);
+
+            long triangles = results.remove(KEY_TRIANGLES);
+            long triads = results.remove(KEY_TRIADS);
+            assert triangles <= triads;
+            double coeffcient = triads == 0L ? 0d : 1d * triangles / triads;
+
+            @SuppressWarnings({ "unchecked", "rawtypes" })
+            Map<String, Double> converted = (Map) results;
+            converted.put("cluster_coeffcient", coeffcient);
+
+            return results;
+        }
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java
new file mode 100644
index 000000000..3f6de63e8
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.comm;
+
+import java.util.Map;
+
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.util.E;
+
+public class LouvainAlgorithm extends AbstractCommAlgorithm {
+
+    @Override
+    public String name() {
+        return "louvain";
+    }
+
+    @Override
+    public void checkParameters(Map<String, Object> parameters) {
+        times(parameters);
+        stableTimes(parameters);
+        precision(parameters);
+        degree(parameters);
+        sourceLabel(parameters);
+        sourceCLabel(parameters);
+        showCommunity(parameters);
+        clearPass(parameters);
+    }
+
+    @Override
+    public Object call(Job<Object> job, Map<String, Object> parameters) {
+        String label = sourceLabel(parameters);
+        String clabel = sourceCLabel(parameters);
+        long degree = degree(parameters);
+
+        LouvainTraverser traverser = new LouvainTraverser(job, degree,
+                                                          label, clabel);
+        Long clearPass = clearPass(parameters);
+        String showComm = showCommunity(parameters);
+        try {
+            if (clearPass != null) {
+                return traverser.clearPass(clearPass.intValue());
+            } else if (showComm != null) {
+                return traverser.showCommunity(showComm);
+            } else {
+                return traverser.louvain(times(parameters),
+                                         stableTimes(parameters),
+                                         precision(parameters));
+            }
+        } catch (Throwable e) {
+            job.graph().tx().rollback();
+            throw e;
+        }
+    }
+
+    protected static Long clearPass(Map<String, Object> parameters) {
+        if (!parameters.containsKey(KEY_CLEAR)) {
+            return null;
+        }
+        long pass = parameterLong(parameters, KEY_CLEAR);
+        // TODO: change to checkNonNegative()
+        E.checkArgument(pass >= 0 || pass == -1,
+                        "The %s parameter must be >= 0 or == -1, but got %s",
+                        KEY_CLEAR, pass);
+        return pass;
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java
new file mode 100644
index 000000000..0b3d674aa
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java
@@ -0,0 +1,715 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.comm;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.backend.id.Id;
+import com.baidu.hugegraph.backend.id.IdGenerator;
+import com.baidu.hugegraph.exception.ExistedException;
+import com.baidu.hugegraph.iterator.ListIterator;
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm;
+import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm.AlgoTraverser;
+import com.baidu.hugegraph.schema.SchemaLabel;
+import com.baidu.hugegraph.schema.SchemaManager;
+import com.baidu.hugegraph.schema.VertexLabel;
+import com.baidu.hugegraph.structure.HugeEdge;
+import com.baidu.hugegraph.structure.HugeVertex;
+import com.baidu.hugegraph.type.define.Directions;
+import com.baidu.hugegraph.util.Log;
+import com.google.common.collect.ImmutableMap;
+
+public class LouvainTraverser extends AlgoTraverser {
+
+    public static final String C_PASS = "c_pass-";
+    public static final String C_KIN = "c_kin";
+    public static final String C_WEIGHT = "c_weight";
+    public static final String C_MEMBERS = "c_members";
+
+    public static final String C_LABEL = LpaAlgorithm.Traverser.C_LABEL;
+
+    private static final long LIMIT = AbstractAlgorithm.MAX_QUERY_LIMIT;
+
+    private static final Logger LOG = Log.logger(LouvainTraverser.class);
+
+    private final GraphTraversalSource g;
+    private final long m;
+    private final String sourceLabel;
+    private final String sourceCLabel;
+    private final long degree;
+    private final Cache cache;
+
+    private String passLabel;
+
+    public LouvainTraverser(Job<Object> job,  long degree,
+                            String sourceLabel, String sourceCLabel) {
+        super(job);
+        this.g = this.graph().traversal();
+        this.m = this.g.E().count().next();
+        this.sourceLabel = sourceLabel;
+        this.sourceCLabel = sourceCLabel;
+        this.degree = degree;
+        this.passLabel = "";
+
+        this.cache = new Cache();
+    }
+
+    @SuppressWarnings("unused")
+    private Id genId2(int pass, Id cid) {
+        // gen id for merge-community vertex
+        String id = cid.toString();
+        if (pass == 0) {
+            // conncat pass with cid
+            id = pass + "~" + id;
+        } else {
+            // replace last pass with current pass
+            String lastPass = String.valueOf(pass - 1);
+            assert id.startsWith(lastPass);
+            id = id.substring(lastPass.length());
+            id = pass + id;
+        }
+        return IdGenerator.of(id);
+    }
+
+    private void defineSchemaOfPk() {
+        String label = this.labelOfPassN(0);
+        if (this.graph().existsVertexLabel(label) ||
+            this.graph().existsEdgeLabel(label)) {
+            throw new IllegalArgumentException(
+                      "Please clear historical results before proceeding");
+        }
+
+        SchemaManager schema = this.graph().schema();
+        schema.propertyKey(C_KIN).asInt()
+              .ifNotExist().create();
+        schema.propertyKey(C_MEMBERS).valueSet().asText()
+              .ifNotExist().create();
+        schema.propertyKey(C_WEIGHT).asFloat()
+              .ifNotExist().create();
+    }
+
+    private void defineSchemaOfPassN(int pass) {
+        this.passLabel = labelOfPassN(pass);
+
+        SchemaManager schema = this.graph().schema();
+        try {
+            schema.vertexLabel(this.passLabel).useCustomizeStringId()
+                  .properties(C_KIN, C_MEMBERS)
+                  .nullableKeys(C_KIN, C_MEMBERS)
+                  .create();
+            schema.edgeLabel(this.passLabel)
+                  .sourceLabel(this.passLabel)
+                  .targetLabel(this.passLabel)
+                  .properties(C_WEIGHT)
+                  .create();
+        } catch (ExistedException e) {
+            throw new IllegalArgumentException(
+                      "Please clear historical results before proceeding", e);
+        }
+    }
+
+    private List<String> cpassEdgeLabels() {
+        List<String> names = new ArrayList<>();
+        for (SchemaLabel label : this.graph().schema().getEdgeLabels()) {
+            String name = label.name();
+            if (name.startsWith(C_PASS)) {
+                names.add(name);
+            }
+        }
+        return names;
+    }
+
+    private List<String> cpassVertexLabels() {
+        List<String> names = new ArrayList<>();
+        for (SchemaLabel label : this.graph().schema().getVertexLabels()) {
+            String name = label.name();
+            if (name.startsWith(C_PASS)) {
+                names.add(name);
+            }
+        }
+        return names;
+    }
+
+    private String labelOfPassN(int n) {
+        return C_PASS + n;
+    }
+
+    private float weightOfEdge(Edge e) {
+        if (e.label().startsWith(C_PASS)) {
+            assert e.property(C_WEIGHT).isPresent();
+            return e.value(C_WEIGHT);
+        } else if (e.property(C_WEIGHT).isPresent()) {
+            return e.value(C_WEIGHT);
+        }
+        return 1f;
+    }
+
+    private float weightOfEdges(List<Edge> edges) {
+        float weight = 0f;
+        for (Edge edge : edges) {
+            weight += weightOfEdge(edge);
+        }
+        return weight;
+    }
+
+    private Vertex newCommunityNode(Id cid, int kin, List<String> members) {
+        assert !members.isEmpty() : members;
+        return this.graph().addVertex(T.label, this.passLabel, T.id, cid,
+                                      C_KIN, kin, C_MEMBERS, members);
+    }
+
+    private Vertex makeCommunityNode(Id cid) {
+        VertexLabel vl = this.graph().vertexLabel(this.passLabel);
+        return new HugeVertex(this.graph(), cid, vl);
+    }
+
+    private Edge newCommunityEdge(Vertex source, Vertex target, float weight) {
+        return source.addEdge(this.passLabel, target, C_WEIGHT, weight);
+    }
+
+    private void insertNewCommunity(int pass, Id cid, int kin,
+                                    List<String> members,
+                                    Map<Id, MutableInt> cedges) {
+        // create backend vertex if it's the first time
+        Id vid = this.cache.genId(pass, cid);
+        Vertex node = this.newCommunityNode(vid, kin, members);
+        commitIfNeeded();
+        // update backend vertex edges
+        for (Map.Entry<Id, MutableInt> e : cedges.entrySet()) {
+            float weight = e.getValue().floatValue();
+            vid = this.cache.genId(pass, e.getKey());
+            Vertex targetV = this.makeCommunityNode(vid);
+            this.newCommunityEdge(node, targetV, weight);
+            commitIfNeeded();
+        }
+        LOG.debug("Add new comm: {} kin={} size={}", node, kin, members.size());
+    }
+
+    private boolean needSkipVertex(int pass, Vertex v) {
+        // skip the old intermediate data when first pass
+        String label = v.label();
+        if (label.startsWith(C_PASS)) {
+            if (pass == 0) {
+                return true;
+            }
+            String lastPassLabel = labelOfPassN(pass - 1);
+            if (!label.equals(lastPassLabel)) {
+                return true;
+            }
+        }
+        // skip the vertex with unmatched clabel
+        if (this.sourceCLabel != null &&
+            !match(v, C_LABEL, this.sourceCLabel)) {
+            return true;
+        }
+        return false;
+    }
+
+    private Iterator<Vertex> sourceVertices(int pass) {
+        if (pass > 0) {
+            // all vertices of merged community
+            String lastPassLabel = labelOfPassN(pass - 1);
+            return this.vertices(lastPassLabel, LIMIT);
+        } else {
+            assert pass == 0;
+            // all vertices at the first time
+            return this.vertices(this.sourceLabel, LIMIT);
+        }
+    }
+
+    private List<Edge> neighbors(Id vid) {
+        Iterator<Edge> nbs = this.edgesOfVertex(vid, Directions.BOTH,
+                                                (Id) null, this.degree);
+        @SuppressWarnings("resource")
+        ListIterator<Edge> list = new ListIterator<>(LIMIT, nbs);
+        return (List<Edge>) list.list();
+    }
+
+    private float weightOfVertex(Vertex v, List<Edge> edges) {
+        Float value = this.cache.vertexWeight((Id) v.id());
+        if (value != null) {
+            return value;
+        }
+        if (edges == null) {
+            edges = neighbors((Id) v.id());
+        }
+        float weight = weightOfEdges(edges);
+        this.cache.vertexWeight((Id) v.id(), weight);
+        return weight;
+    }
+
+    private int kinOfVertex(Vertex v) {
+        if (v.label().startsWith(C_PASS) && v.property(C_KIN).isPresent()) {
+            return v.value(C_KIN);
+        }
+        return 0;
+    }
+
+    private Id cidOfVertex(Vertex v) {
+        Id vid = (Id) v.id();
+        Community c = this.cache.vertex2Community(vid);
+        return c != null ? c.cid : vid;
+    }
+
+    // 1: wrap original vertex as community node
+    // 2: add original vertices to community node,
+    //    and save as community vertex when merge()
+    // 3: wrap community vertex as community node,
+    //    and repeat step 2 and step 3.
+    private Community wrapCommunity(Vertex otherV) {
+        Id vid = (Id) otherV.id();
+        Community comm = this.cache.vertex2Community(vid);
+        if (comm != null) {
+            return comm;
+        }
+
+        comm = new Community(vid);
+        comm.add(this, otherV, null); // will traverse the neighbors of otherV
+        this.cache.vertex2Community(vid, comm);
+        return comm;
+    }
+
+    private Collection<Pair<Community, MutableInt>> nbCommunities(
+                                                    int pass,
+                                                    List<Edge> edges) {
+        // comms is a map of cid:[community,weight]
+        Map<Id, Pair<Community, MutableInt>> comms = new HashMap<>();
+        for (Edge edge : edges) {
+            Vertex otherV = ((HugeEdge) edge).otherVertex();
+            if (needSkipVertex(pass, otherV)) {
+                // skip the old intermediate data, or filter clabel
+                continue;
+            }
+            Community c = wrapCommunity(otherV);
+            if (!comms.containsKey(c.cid)) {
+                comms.put(c.cid, Pair.of(c, new MutableInt(0)));
+            }
+            // calc weight between source vertex and neighbor community
+            comms.get(c.cid).getRight().add(2 * weightOfEdge(edge));
+        }
+        return comms.values();
+    }
+
+    private void moveCommunity(Vertex v, List<Edge> nbs, Community newC) {
+        Id vid = (Id) v.id();
+
+        // remove v from old community
+        Community oldC = this.cache.vertex2Community(vid);
+        if (oldC != null) {
+            oldC.remove(this, v, nbs);
+        }
+
+        // add v to new community
+        newC.add(this, v, nbs);
+        LOG.debug("Move {} to comm: {}", v, newC);
+
+        // update community of v
+        this.cache.vertex2Community(vid, newC);
+    }
+
+    private double moveCommunities(int pass) {
+        Iterator<Vertex> vertices = this.sourceVertices(pass);
+
+        // shuffle
+        //r = r.order().by(shuffle);
+
+        long total = 0L;
+        long moved = 0L;
+        while (vertices.hasNext()) {
+            this.updateProgress(++this.progress);
+            Vertex v = vertices.next();
+            if (needSkipVertex(pass, v)) {
+                // skip the old intermediate data, or filter clabel
+                continue;
+            }
+            total++;
+            Id cid = cidOfVertex(v);
+            List<Edge> nbs = neighbors((Id) v.id());
+            double ki = kinOfVertex(v) + weightOfVertex(v, nbs);
+            // update community of v if △Q changed
+            double maxDeltaQ = 0d;
+            Community bestComm = null;
+            // list all neighbor communities of v
+            for (Pair<Community, MutableInt> nbc : nbCommunities(pass, nbs)) {
+                // △Q = (Ki_in - Ki * Etot / m) / 2m
+                Community otherC = nbc.getLeft();
+                // weight between c and otherC
+                double kiin = nbc.getRight().floatValue();
+                // weight of otherC
+                int tot = otherC.kin() + otherC.kout();
+                if (cid.equals(otherC.cid)) {
+                    tot -= ki;
+                    assert tot >= 0;
+                    // expect tot >= 0, but may be something wrong?
+                    if (tot < 0) {
+                        tot = 0;
+                    }
+                }
+                double deltaQ = kiin - ki * tot / this.m;
+                if (deltaQ > maxDeltaQ) {
+                    // TODO: cache otherC for neighbors the same community
+                    maxDeltaQ = deltaQ;
+                    bestComm = otherC;
+                }
+            }
+            if (maxDeltaQ > 0d && !cid.equals(bestComm.cid)) {
+                moved++;
+                // move v to the community of maxQ neighbor
+                moveCommunity(v, nbs, bestComm);
+            }
+        }
+
+        // maybe always shocking when set degree limit
+        return total == 0L ? 0d : (double) moved / total;
+    }
+
+    private void mergeCommunities(int pass) {
+        // merge each community as a vertex
+        Collection<Pair<Community, Set<Id>>> comms = this.cache.communities();
+        this.cache.resetVertexWeight();
+        for (Pair<Community, Set<Id>> pair : comms) {
+            Community c = pair.getKey();
+            if (c.empty()) {
+                continue;
+            }
+            // update kin and edges between communities
+            int kin = c.kin();
+            Set<Id> vertices = pair.getRight();
+            assert !vertices.isEmpty();
+            List<String> members = new ArrayList<>(vertices.size());
+            Map<Id, MutableInt> cedges = new HashMap<>(vertices.size());
+            for (Id v : vertices) {
+                members.add(v.toString());
+                // collect edges between this community and other communities
+                List<Edge> neighbors = neighbors(v);
+                for (Edge edge : neighbors) {
+                    Vertex otherV = ((HugeEdge) edge).otherVertex();
+                    if (vertices.contains(otherV.id())) {
+                        // inner edges of this community, will be calc twice
+                        // due to both e-in and e-out are in vertices,
+                        kin += weightOfEdge(edge);
+                        continue;
+                    }
+                    Id otherCid = cidOfVertex(otherV);
+                    if (otherCid.compareTo(c.cid) < 0) {
+                        // skip if it should be collected by otherC
+                        continue;
+                    }
+                    if (!cedges.containsKey(otherCid)) {
+                        cedges.put(otherCid, new MutableInt(0));
+                    }
+                    cedges.get(otherCid).add(weightOfEdge(edge));
+                }
+            }
+            // insert new community vertex and edges into storage
+            this.insertNewCommunity(pass, c.cid, kin, members, cedges);
+        }
+        this.graph().tx().commit();
+        // reset communities
+        this.cache.reset();
+    }
+
+    public Object louvain(int maxTimes, int stableTimes, double precision) {
+        assert maxTimes > 0;
+        assert precision > 0d;
+
+        this.defineSchemaOfPk();
+
+        /*
+         * iterate until it has stabilized or
+         * the maximum number of times is reached
+         */
+        int times = maxTimes;
+        int movedTimes = 0;
+        double movedPercent = 0d;
+        double lastMovedPercent = 0d;
+
+        for (int i = 0; i < maxTimes; i++) {
+            boolean finished = true;
+            movedPercent = 0d;
+            lastMovedPercent = 1d;
+            int tinyChanges = 0;
+            while ((movedPercent = this.moveCommunities(i)) > 0d) {
+                movedTimes++;
+                finished = false;
+                if (lastMovedPercent - movedPercent < precision) {
+                    tinyChanges++;
+                }
+                if (i == 0 && movedPercent < precision) {
+                    // stop the first round of iterations early
+                    break;
+                }
+                if (tinyChanges >= stableTimes) {
+                    // maybe always shaking and falling into an dead loop
+                    break;
+                }
+                lastMovedPercent = movedPercent;
+            }
+            if (finished) {
+                times = i;
+                break;
+            } else {
+                this.defineSchemaOfPassN(i);
+                this.mergeCommunities(i);
+            }
+        }
+
+        long communities = 0L;
+        String commLabel = this.passLabel;
+        if (!commLabel.isEmpty()) {
+            GraphTraversal<?, Long> t = this.g.V().hasLabel(commLabel).count();
+            communities = this.execute(t, t::next);
+        }
+        return ImmutableMap.of("pass_times", times,
+                               "phase1_times", movedTimes,
+                               "last_precision", movedPercent,
+                               "times", maxTimes,
+                               "communities", communities);
+    }
+
+    public double modularity(int pass) {
+        // pass: label the last pass
+        String label = labelOfPassN(pass);
+        Number kin = this.g.V().hasLabel(label).values(C_KIN).sum().next();
+        Number weight = this.g.E().hasLabel(label).values(C_WEIGHT).sum().next();
+        double m = kin.intValue() + weight.floatValue() * 2.0d;
+        double q = 0.0d;
+        Iterator<Vertex> coms = this.g.V().hasLabel(label);
+        while (coms.hasNext()) {
+            Vertex com = coms.next();
+            int cin = com.value(C_KIN);
+            Number cout = this.g.V(com).bothE().values(C_WEIGHT).sum().next();
+            double cdegree = cin + cout.floatValue();
+            // Q = ∑(I/M - ((2I+O)/2M)^2)
+            q += cin / m - Math.pow(cdegree / m, 2);
+        }
+        return q;
+    }
+
+    public Collection<Object> showCommunity(String community) {
+        final String C_PASS0 = labelOfPassN(0);
+        Collection<Object> comms = Arrays.asList(community);
+        boolean reachPass0 = false;
+        while (comms.size() > 0 && !reachPass0) {
+            Iterator<Vertex> subComms = this.vertices(comms.iterator());
+            comms = new HashSet<>();
+            while (subComms.hasNext()) {
+                this.updateProgress(++this.progress);
+                Vertex sub = subComms.next();
+                if (sub.property(C_MEMBERS).isPresent()) {
+                    Set<Object> members = sub.value(C_MEMBERS);
+                    reachPass0 =  sub.label().equals(C_PASS0);
+                    comms.addAll(members);
+                }
+            }
+        }
+        return comms;
+    }
+
+    public long clearPass(int pass) {
+        GraphTraversal<Edge, Edge> te = this.g.E();
+        if (pass < 0) {
+            // drop edges of all pass
+            List<String> els = this.cpassEdgeLabels();
+            if (els.size() > 0) {
+                String first = els.remove(0);
+                te = te.hasLabel(first, els.toArray(new String[els.size()]));
+                this.drop(te);
+            }
+            // drop schema
+            for (String label : this.cpassEdgeLabels()) {
+                this.graph().schema().edgeLabel(label).remove();
+            }
+        } else {
+            // drop edges of pass N
+            String label = labelOfPassN(pass);
+            if (this.graph().existsEdgeLabel(label)) {
+                te = te.hasLabel(label);
+                this.drop(te);
+                // drop schema
+                this.graph().schema().edgeLabel(label).remove();
+            }
+        }
+
+        GraphTraversal<Vertex, Vertex> tv = this.g.V();
+        if (pass < 0) {
+            // drop vertices of all pass
+            List<String> vls = this.cpassVertexLabels();
+            if (vls.size() > 0) {
+                String first = vls.remove(0);
+                tv = tv.hasLabel(first, vls.toArray(new String[vls.size()]));
+                this.drop(tv);
+            }
+            // drop schema
+            for (String label : this.cpassVertexLabels()) {
+                this.graph().schema().vertexLabel(label).remove();
+            }
+        } else {
+            // drop vertices of pass N
+            String label = labelOfPassN(pass);
+            if (this.graph().existsVertexLabel(label)) {
+                tv = tv.hasLabel(label);
+                this.drop(tv);
+                // drop schema
+                this.graph().schema().vertexLabel(label).remove();
+            }
+        }
+
+        return this.progress;
+    }
+
+    private static class Community {
+
+        // community id (stored as a backend vertex)
+        private final Id cid;
+        // community members size
+        private int size = 0;
+        /*
+         * weight of all edges in community(2X), sum of kin of new members
+         *  [each is from the last pass, stored in backend vertex]
+         */
+        private int kin = 0;
+        /*
+         * weight of all edges between communities, sum of kout of new members
+         * [each is last pass, calculated in real time by neighbors]
+         */
+        //
+        private int kout = 0;
+
+        public Community(Id cid) {
+            this.cid = cid;
+        }
+
+        public boolean empty() {
+            return this.size <= 0;
+        }
+
+        public void add(LouvainTraverser t, Vertex v, List<Edge> nbs) {
+            this.size++;
+            this.kin += t.kinOfVertex(v);
+            this.kout += t.weightOfVertex(v, nbs);
+        }
+
+        public void remove(LouvainTraverser t, Vertex v, List<Edge> nbs) {
+            this.size--;
+            this.kin -= t.kinOfVertex(v);
+            this.kout -= t.weightOfVertex(v, nbs);
+        }
+
+        public int kin() {
+            return this.kin;
+        }
+
+        public int kout() {
+            return this.kout;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("[%s](size=%s kin=%s kout=%s)",
+                                 this.cid , this.size, this.kin, this.kout);
+        }
+    }
+
+    private static class Cache {
+
+        private final Map<Id, Float> vertexWeightCache;
+        private final Map<Id, Community> vertex2Community;
+        private final Map<Id, Integer> genIds;
+
+        public Cache() {
+            this.vertexWeightCache = new HashMap<>();
+            this.vertex2Community = new HashMap<>();
+            this.genIds = new HashMap<>();
+        }
+
+        public Community vertex2Community(Id id) {
+            return this.vertex2Community.get(id);
+        }
+
+        public void vertex2Community(Id id, Community c) {
+            this.vertex2Community.put(id, c);
+        }
+
+        public Float vertexWeight(Id id) {
+            return this.vertexWeightCache.get(id);
+        }
+
+        public void vertexWeight(Id id, float weight) {
+            this.vertexWeightCache.put(id, weight);
+        }
+
+        public void reset() {
+            this.vertexWeightCache.clear();
+            this.vertex2Community.clear();
+            this.genIds.clear();
+        }
+
+        public void resetVertexWeight() {
+            this.vertexWeightCache.clear();
+        }
+
+        public Id genId(int pass, Id cid) {
+            if (!this.genIds.containsKey(cid)) {
+                this.genIds.put(cid, this.genIds.size() + 1);
+            }
+            String id = pass + "~" + this.genIds.get(cid);
+            return IdGenerator.of(id);
+        }
+
+        public Collection<Pair<Community, Set<Id>>> communities(){
+            // TODO: get communities from backend store instead of ram
+            Map<Id, Pair<Community, Set<Id>>> comms = new HashMap<>();
+            for (Entry<Id, Community> e : this.vertex2Community.entrySet()) {
+                Community c = e.getValue();
+                if (c.empty()) {
+                    continue;
+                }
+                Pair<Community, Set<Id>> pair = comms.get(c.cid);
+                if (pair == null) {
+                    pair = Pair.of(c, new HashSet<>());
+                    comms.put(c.cid, pair);
+                }
+                // collect members joined to the community [current pass]
+                pair.getRight().add(e.getKey());
+            }
+            return comms.values();
+        }
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java
new file mode 100644
index 000000000..af7b299ae
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java
@@ -0,0 +1,263 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.comm;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import com.baidu.hugegraph.backend.id.Id;
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.schema.SchemaManager;
+import com.baidu.hugegraph.schema.VertexLabel;
+import com.baidu.hugegraph.type.define.Directions;
+import com.google.common.collect.ImmutableMap;
+
+public class LpaAlgorithm extends AbstractCommAlgorithm {
+
+    @Override
+    public String name() {
+        return "lpa";
+    }
+
+    @Override
+    public void checkParameters(Map<String, Object> parameters) {
+        times(parameters);
+        precision(parameters);
+        sourceLabel(parameters);
+        edgeLabel(parameters);
+        direction(parameters);
+        degree(parameters);
+        showCommunity(parameters);
+    }
+
+    @Override
+    public Object call(Job<Object> job, Map<String, Object> parameters) {
+        Traverser traverser = new Traverser(job);
+        String showComm = showCommunity(parameters);
+
+        try {
+            if (showComm != null) {
+                return traverser.showCommunity(showComm);
+            } else {
+                return traverser.lpa(sourceLabel(parameters),
+                                     edgeLabel(parameters),
+                                     direction(parameters),
+                                     degree(parameters),
+                                     times(parameters),
+                                     precision(parameters));
+            }
+        } catch (Throwable e) {
+            job.graph().tx().rollback();
+            throw e;
+        }
+    }
+
+    public static class Traverser extends AlgoTraverser {
+
+        public static final String C_LABEL = "c_label";
+        private static final long LIMIT = MAX_QUERY_LIMIT;
+
+        private final Random R = new Random();
+
+        public Traverser(Job<Object> job) {
+            super(job);
+        }
+
+        public Object lpa(String sourceLabel, String edgeLabel,
+                          Directions dir, long degree,
+                          int maxTimes, double precision) {
+            assert maxTimes > 0;
+            assert precision > 0d;
+
+            this.initSchema();
+
+            int times = maxTimes;
+            double changedPercent = 0d;
+
+            /*
+             * Iterate until:
+             *  1.it has stabilized
+             *  2.or the maximum number of times is reached
+             */
+            for (int i = 0; i < maxTimes; i++) {
+                changedPercent = this.detectCommunities(sourceLabel, edgeLabel,
+                                                        dir, degree);
+                if (changedPercent <= precision) {
+                    times = i + 1;
+                    break;
+                }
+            }
+
+            long communities = this.graph().traversal().V().limit(10000L)
+                                   .groupCount().by(C_LABEL)
+                                   .count(Scope.local).next();
+            return ImmutableMap.of("iteration_times", times,
+                                   "last_precision", changedPercent,
+                                   "times", maxTimes,
+                                   "communities", communities);
+        }
+
+        public Object showCommunity(String clabel) {
+            // all vertices with specified c-label
+            Iterator<Vertex> vertices = this.vertices(LIMIT);
+            vertices = filter(vertices, C_LABEL, clabel);
+
+            JsonMap json = new JsonMap();
+            json.startList();
+            while (vertices.hasNext()) {
+                this.updateProgress(++this.progress);
+                json.append(vertices.next().id().toString());
+            }
+            json.endList();
+
+            return json.asJson();
+        }
+
+        private double detectCommunities(String sourceLabel, String edgeLabel,
+                                         Directions dir, long degree) {
+            // shuffle: r.order().by(shuffle)
+            // r = this.graph().traversal().V().sample((int) LIMIT);
+
+            // all vertices
+            Iterator<Vertex> vertices = this.vertices(sourceLabel, LIMIT);
+
+            long total = 0L;
+            long changed = 0L;
+            while (vertices.hasNext()) {
+                this.updateProgress(++this.progress);
+                total++;
+                Vertex v = vertices.next();
+                String label = this.voteCommunityOfVertex(v, edgeLabel,
+                                                          dir, degree);
+                // update label if it's absent or changed
+                if (!labelPresent(v) || !label.equals(this.labelOfVertex(v))) {
+                    changed++;
+                    this.updateLabelOfVertex(v, label);
+                }
+            }
+            this.graph().tx().commit();
+
+            return total == 0L ? 0d : (double) changed / total;
+        }
+
+        private String voteCommunityOfVertex(Vertex vertex, String edgeLabel,
+                                             Directions dir, long degree) {
+            // neighbors of source vertex v
+            Id source = (Id) vertex.id();
+            Id labelId = this.getEdgeLabelId(edgeLabel);
+            Iterator<Id> neighbors = this.adjacentVertices(source, dir,
+                                                           labelId, degree);
+
+            // whether or not include vertex itself, greatly affects the result.
+            // get a larger number of small communities if include itself
+            //neighbors.inject(v);
+
+            // calculate label frequency
+            Map<String, MutableInt> labels = new HashMap<>();
+            while (neighbors.hasNext()) {
+                String label = this.labelOfVertex(neighbors.next());
+                if (label == null) {
+                    // ignore invalid or not-exist vertex
+                    continue;
+                }
+                MutableInt labelCount = labels.get(label);
+                if (labelCount != null) {
+                    labelCount.increment();
+                } else {
+                    labels.put(label, new MutableInt(1));
+                }
+            }
+
+            // isolated vertex
+            if (labels.size() == 0) {
+                return this.labelOfVertex(vertex);
+            }
+
+            // get the labels with maximum frequency
+            List<String> maxLabels = new ArrayList<>();
+            int maxFreq = 1;
+            for (Map.Entry<String, MutableInt> e : labels.entrySet()) {
+                int value = e.getValue().intValue();
+                if (value > maxFreq) {
+                    maxFreq = value;
+                    maxLabels.clear();
+                }
+                if (value == maxFreq) {
+                    maxLabels.add(e.getKey());
+                }
+            }
+
+            /*
+             * TODO:
+             * keep origin label with probability to prevent monster communities
+             */
+
+            // random choice
+            int selected = this.R.nextInt(maxLabels.size());
+            return maxLabels.get(selected);
+        }
+
+        private boolean labelPresent(Vertex vertex) {
+            return vertex.property(C_LABEL).isPresent();
+        }
+
+        private String labelOfVertex(Vertex vertex) {
+            if (!labelPresent(vertex)) {
+                return vertex.id().toString();
+            }
+            return vertex.value(C_LABEL);
+        }
+
+        private String labelOfVertex(Id vid) {
+            // TODO: cache with Map<Id, String>
+            Iterator<Vertex> iter = this.graph().vertices(vid);
+            if (!iter.hasNext()) {
+                return null;
+            }
+            Vertex vertex = iter.next();
+            return this.labelOfVertex(vertex);
+        }
+
+        private void updateLabelOfVertex(Vertex v, String label) {
+            // TODO: cache with Map<Id, String>
+            v.property(C_LABEL, label);
+            this.commitIfNeeded();
+        }
+
+        private void initSchema() {
+            String cl = C_LABEL;
+            SchemaManager schema = this.graph().schema();
+            schema.propertyKey(cl).asText().ifNotExist().create();
+            for (VertexLabel vl : schema.getVertexLabels()) {
+                schema.vertexLabel(vl.name())
+                      .properties(cl).nullableKeys(cl)
+                      .append();
+            }
+        }
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java
new file mode 100644
index 000000000..c47d19f65
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.comm;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.tinkerpop.gremlin.structure.Edge;
+
+import com.baidu.hugegraph.backend.id.Id;
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.structure.HugeEdge;
+import com.baidu.hugegraph.type.define.Directions;
+import com.baidu.hugegraph.util.InsertionOrderUtil;
+import com.google.common.collect.ImmutableMap;
+
+public class TriangleCountAlgorithm extends AbstractCommAlgorithm {
+
+    @Override
+    public String name() {
+        return "triangle_count";
+    }
+
+    @Override
+    public void checkParameters(Map<String, Object> parameters) {
+        direction(parameters);
+        degree(parameters);
+    }
+
+    @Override
+    public Object call(Job<Object> job, Map<String, Object> parameters) {
+        Traverser traverser = new Traverser(job);
+        return traverser.triangleCount(direction(parameters),
+                                       degree(parameters));
+    }
+
+    protected static class Traverser extends AlgoTraverser {
+
+        protected static final String KEY_TRIANGLES = "triangles";
+        protected static final String KEY_TRIADS = "triads";
+
+        public Traverser(Job<Object> job) {
+            super(job);
+        }
+
+        public Object triangleCount(Directions direction, long degree) {
+            Map<String, Long> results = triangles( direction, degree);
+            results = InsertionOrderUtil.newMap(results);
+            results.remove(KEY_TRIADS);
+            return results;
+        }
+
+        protected Map<String, Long> triangles(Directions direction,
+                                              long degree) {
+            if (direction == null || direction == Directions.BOTH) {
+                throw new IllegalArgumentException("Direction must be OUT/IN");
+            }
+            assert direction == Directions.OUT || direction == Directions.IN;
+
+            Iterator<Edge> edges = this.edges(direction);
+
+            long triangles = 0L;
+            long triads = 0L;
+            long total = 0L;
+            long totalVertices = 0L;
+            Id vertex = null;
+
+            Set<Id> adjVertices = new HashSet<>();
+            while (edges.hasNext()) {
+                HugeEdge edge = (HugeEdge) edges.next();
+                this.updateProgress(++total);
+
+                Id source = edge.ownerVertex().id();
+                Id target = edge.otherVertex().id();
+                if (vertex == source) {
+                    // Ignore and skip the target vertex if exceed degree
+                    if (adjVertices.size() < degree || degree == NO_LIMIT) {
+                        adjVertices.add(target);
+                    }
+                    continue;
+                }
+
+                if (vertex != null) {
+                    assert vertex != source;
+                    /*
+                     * Find graph mode like this:
+                     * A -> [B,C,D,E,F]
+                     *      B -> [D,F]
+                     *      E -> [B,C,F]
+                     */
+                    triangles += this.intersect(direction, degree, adjVertices);
+                    triads += this.localTriads(adjVertices.size());
+                    totalVertices++;
+                    // Reset for the next source
+                    adjVertices = new HashSet<>();
+                }
+                vertex = source;
+                adjVertices.add(target);
+            }
+
+            if (vertex != null) {
+                triangles += this.intersect(direction, degree, adjVertices);
+                triads += this.localTriads(adjVertices.size());
+                totalVertices++;
+            }
+
+            String suffix = "_" + direction.string();
+            return ImmutableMap.of("edges" + suffix, total,
+                                   "vertices" + suffix, totalVertices,
+                                   KEY_TRIANGLES, triangles,
+                                   KEY_TRIADS, triads);
+        }
+
+        protected long intersect(Directions dir, long degree,
+                                 Set<Id> adjVertices) {
+            long count = 0L;
+            Iterator<Id> vertices;
+            for (Id v : adjVertices) {
+                vertices = this.adjacentVertices(v, dir, null, degree);
+                while (vertices.hasNext()) {
+                    Id vertex = vertices.next();
+                    if (adjVertices.contains(vertex)) {
+                        count++;
+                    }
+                }
+            }
+            return count;
+        }
+
+        protected long localTriads(int size) {
+            return size * (size - 1L) / 2L;
+        }
+    }
+}