You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by ji...@apache.org on 2022/11/09 10:25:18 UTC

[incubator-hugegraph] 15/33: add BOTH direction support for triangle_count/cluster_coeffcient (#24)

This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git

commit 53025bd43bfe537a033597cea51720620089c25b
Author: Jermy Li <li...@baidu.com>
AuthorDate: Wed Jun 24 17:58:34 2020 +0800

    add BOTH direction support for triangle_count/cluster_coeffcient (#24)
    
    change log:
    1. add BOTH direction support for triangle_count/cluster_coeffcientith.
    2. fix extra triangle count with multi edges between tow adjacent vertices.
    3. set default value of direction to BOTH for degree_centrality and cluster_coeffcient .
    4. add workers for triangle_count and cluster_coeffcient.
    5. fix closeness: multi shortest paths cause results illogical.
    6. rename rings_detect to rings, rename limit to each_limit which means limit number of rings of each source vertex, and don't do dedup if passed each_limit > 0.
    7. unify top for 4 centrality algos: sorted results when top = -1, unsorted results when top = 0.
    8. fusiform: rename top to top_similars (expected >= 0).
    9. fusiform/rings: add limit param which means max number of results, and remove capacity param and hardcode to 100000000.
    
    Change-Id: I9ddf8553e6d86b99adbff8b972890d69d623fa1a
---
 .../hugegraph/job/algorithm/AbstractAlgorithm.java |  52 ++++++---
 .../baidu/hugegraph/job/algorithm/Consumers.java   |  25 ++++-
 .../job/algorithm/SubgraphStatAlgorithm.java       |   5 +
 .../job/algorithm/cent/AbstractCentAlgorithm.java  |  30 ++++-
 .../cent/BetweenessCentralityAlgorithm.java        |  14 +--
 .../cent/ClosenessCentralityAlgorithm.java         |  12 +-
 .../algorithm/cent/DegreeCentralityAlgorithm.java  |  22 ++--
 .../cent/EigenvectorCentralityAlgorithm.java       |  11 +-
 .../algorithm/comm/ClusterCoeffcientAlgorithm.java |  27 ++++-
 .../job/algorithm/comm/KCoreAlgorithm.java         |   6 +-
 .../job/algorithm/comm/LouvainAlgorithm.java       |   4 +-
 .../job/algorithm/comm/LouvainTraverser.java       |   6 +-
 .../hugegraph/job/algorithm/comm/LpaAlgorithm.java |   6 +-
 .../job/algorithm/comm/TriangleCountAlgorithm.java | 121 ++++++++++++++++++---
 .../job/algorithm/path/RingsDetectAlgorithm.java   |  55 ++++++----
 .../similarity/FusiformSimilarityAlgorithm.java    |  50 ++++++---
 16 files changed, 315 insertions(+), 131 deletions(-)

diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
index 67d508a47..e30ce1e16 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
@@ -36,6 +36,7 @@ import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Property;
 import org.apache.tinkerpop.gremlin.structure.Transaction;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
 
 import com.baidu.hugegraph.HugeException;
 import com.baidu.hugegraph.backend.id.Id;
@@ -44,6 +45,7 @@ import com.baidu.hugegraph.backend.query.Query;
 import com.baidu.hugegraph.iterator.FilterIterator;
 import com.baidu.hugegraph.iterator.FlatMapperIterator;
 import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.job.algorithm.Consumers.StopExecution;
 import com.baidu.hugegraph.testutil.Whitebox;
 import com.baidu.hugegraph.traversal.algorithm.HugeTraverser;
 import com.baidu.hugegraph.type.HugeType;
@@ -61,6 +63,7 @@ public abstract class AbstractAlgorithm implements Algorithm {
 
     public static final long MAX_RESULT_SIZE = 100L * Bytes.MB;
     public static final long MAX_QUERY_LIMIT = 100000000L; // about 100GB
+    public static final long MAX_CAPACITY = MAX_QUERY_LIMIT;
     public static final int BATCH = 500;
 
     public static final String CATEGORY_AGGR = "aggregate";
@@ -87,11 +90,13 @@ public abstract class AbstractAlgorithm implements Algorithm {
     public static final String KEY_CLEAR = "clear";
     public static final String KEY_CAPACITY = "capacity";
     public static final String KEY_LIMIT = "limit";
+    public static final String KEY_EACH_LIMIT = "each_limit";
     public static final String KEY_ALPHA = "alpha";
     public static final String KEY_WORKERS = "workers";
 
     public static final long DEFAULT_CAPACITY = 10000000L;
     public static final long DEFAULT_LIMIT = 100L;
+    public static final long DEFAULT_EACH_LIMIT = 1L;
     public static final long DEFAULT_DEGREE = 100L;
     public static final long DEFAULT_SAMPLE = 1L;
     public static final long DEFAULT_TIMES = 20L;
@@ -131,6 +136,14 @@ public abstract class AbstractAlgorithm implements Algorithm {
         return parseDirection(direction);
     }
 
+    protected static Directions direction4Out(Map<String, Object> parameters) {
+        if (!parameters.containsKey(KEY_DIRECTION)) {
+            return Directions.OUT;
+        }
+        Object direction = parameter(parameters, KEY_DIRECTION);
+        return parseDirection(direction);
+    }
+
     protected static Directions directionOutIn(Map<String, Object> parameters) {
         if (!parameters.containsKey(KEY_DIRECTION)) {
             return Directions.OUT;
@@ -148,14 +161,10 @@ public abstract class AbstractAlgorithm implements Algorithm {
             return DEFAULT_ALPHA;
         }
         double alpha = parameterDouble(parameters, KEY_ALPHA);
-        checkAlpha(alpha);
-        return alpha;
-    }
-
-    public static void checkAlpha(double alpha) {
-        E.checkArgument(alpha > 0 && alpha <= 1.0,
+        E.checkArgument(alpha > 0.0 && alpha <= 1.0,
                         "The value of %s must be in range (0, 1], but got %s",
                         KEY_ALPHA, alpha);
+        return alpha;
     }
 
     protected static long top(Map<String, Object> parameters) {
@@ -163,9 +172,7 @@ public abstract class AbstractAlgorithm implements Algorithm {
             return 0L;
         }
         long top = parameterLong(parameters, KEY_TOP);
-        E.checkArgument(top >= 0L,
-                        "The value of %s must be >= 0, but got %s",
-                        KEY_TOP, top);
+        HugeTraverser.checkNonNegativeOrNoLimit(top, KEY_TOP);
         return top;
     }
 
@@ -196,6 +203,15 @@ public abstract class AbstractAlgorithm implements Algorithm {
         return limit;
     }
 
+    protected static long eachLimit(Map<String, Object> parameters) {
+        if (!parameters.containsKey(KEY_EACH_LIMIT)) {
+            return DEFAULT_EACH_LIMIT;
+        }
+        long limit = parameterLong(parameters, KEY_EACH_LIMIT);
+        HugeTraverser.checkPositiveOrNoLimit(limit, KEY_EACH_LIMIT);
+        return limit;
+    }
+
     protected static long sample(Map<String, Object> parameters) {
         if (!parameters.containsKey(KEY_SAMPLE)) {
             return DEFAULT_SAMPLE;
@@ -355,21 +371,24 @@ public abstract class AbstractAlgorithm implements Algorithm {
 
             Consumers<Vertex> consumers = new Consumers<>(this.executor,
                                                           consumer, done);
-            consumers.start();
+            consumers.start("task-" + this.job.task().id());
+            long total = 0L;
             try {
-                long total = 0L;
                 while (vertices.hasNext()) {
                     this.updateProgress(++this.progress);
                     total++;
                     Vertex v = vertices.next();
                     consumers.provide(v);
                 }
-                return total;
+            } catch (StopExecution e) {
+                // pass
             } catch (Throwable e) {
                 throw Consumers.wrapException(e);
             } finally {
                 consumers.await();
+                CloseableIterator.closeIterator(vertices);
             }
+            return total;
         }
 
         protected Iterator<Vertex> vertices() {
@@ -520,9 +539,11 @@ public abstract class AbstractAlgorithm implements Algorithm {
         }
 
         public void put(K key, long value) {
+            assert this.topN != 0L;
             this.tops.put(key, new MutableLong(value));
             // keep 2x buffer
-            if (this.tops.size() > this.topN * 2) {
+            if (this.tops.size() > this.topN * 2 &&
+                this.topN != HugeTraverser.NO_LIMIT) {
                 this.shrinkIfNeeded(this.topN);
             }
         }
@@ -537,7 +558,10 @@ public abstract class AbstractAlgorithm implements Algorithm {
         }
 
         private void shrinkIfNeeded(long limit) {
-            if (this.tops.size() >= limit && limit != HugeTraverser.NO_LIMIT) {
+            assert limit != 0L;
+            if (this.tops.size() >= limit &&
+                (limit > 0L || limit == HugeTraverser.NO_LIMIT)) {
+                // Just do sort if limit=NO_LIMIT, else do sort and shrink
                 this.tops = HugeTraverser.topN(this.tops, true, limit);
             }
         }
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
index f5d01d980..711e95edc 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
@@ -73,14 +73,14 @@ public class Consumers<V> {
         this.queue = new ArrayBlockingQueue<>(this.queueSize);
     }
 
-    public void start() {
+    public void start(String name) {
         this.ending = false;
         this.exception = null;
         if (this.executor == null) {
             return;
         }
-        LOG.info("Starting {} workers with queue size {}...",
-                 this.workers, this.queueSize);
+        LOG.info("Starting {} workers[{}] with queue size {}...",
+                 this.workers, name, this.queueSize);
         for (int i = 0; i < this.workers; i++) {
             this.executor.submit(() -> {
                 try {
@@ -88,8 +88,10 @@ public class Consumers<V> {
                     this.done();
                 } catch (Throwable e) {
                     // Only the first exception of one thread can be stored
-                    this.exception  = e;
-                    LOG.error("Error when running task", e);
+                    this.exception = e;
+                    if (!(e instanceof StopExecution)) {
+                        LOG.error("Error when running task", e);
+                    }
                     this.done();
                 } finally {
                     this.latch.countDown();
@@ -183,4 +185,17 @@ public class Consumers<V> {
         throw new HugeException("Error when running task: %s",
                                 HugeException.rootCause(e).getMessage(), e);
     }
+
+    public static class StopExecution extends HugeException {
+
+        private static final long serialVersionUID = -371829356182454517L;
+
+        public StopExecution(String message) {
+            super(message);
+        }
+
+        public StopExecution(String message, Object... args) {
+            super(message, args);
+        }
+    }
 }
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java
index 385fbbb5b..4ce97d362 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java
@@ -39,6 +39,7 @@ import com.baidu.hugegraph.job.algorithm.comm.ClusterCoeffcientAlgorithm;
 import com.baidu.hugegraph.job.algorithm.path.RingsDetectAlgorithm;
 import com.baidu.hugegraph.job.algorithm.rank.PageRankAlgorithm;
 import com.baidu.hugegraph.task.HugeTask;
+import com.baidu.hugegraph.traversal.algorithm.HugeTraverser;
 import com.baidu.hugegraph.traversal.optimize.HugeScriptTraversal;
 import com.baidu.hugegraph.util.E;
 import com.baidu.hugegraph.util.InsertionOrderUtil;
@@ -123,6 +124,7 @@ public class SubgraphStatAlgorithm extends AbstractAlgorithm {
                                                     "depth", 10L,
                                                     "degree", -1L,
                                                     "sample", -1L,
+                                                    "top", -1L /* sorted */,
                                                     "workers", 0);
 
         public Traverser(Job<Object> job) {
@@ -158,6 +160,8 @@ public class SubgraphStatAlgorithm extends AbstractAlgorithm {
             parameters = ImmutableMap.<String, Object>builder()
                                      .putAll(PARAMS)
                                      .put("count_only", true)
+                                     .put("each_limit", NO_LIMIT)
+                                     .put("limit", NO_LIMIT)
                                      .build();
             results.put("rings", algo.call(job, parameters));
 
@@ -175,6 +179,7 @@ public class SubgraphStatAlgorithm extends AbstractAlgorithm {
                 Vertex vertex = vertices.next();
                 ranks.put(vertex.id(), vertex.value(R_RANK));
             }
+            ranks = HugeTraverser.topN(ranks, true, NO_LIMIT);
             return ranks;
         }
     }
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java
index 22372ad09..7b11c134c 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java
@@ -24,9 +24,12 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.tinkerpop.gremlin.process.traversal.Order;
 import org.apache.tinkerpop.gremlin.process.traversal.Pop;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.structure.Column;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 
@@ -130,10 +133,10 @@ public abstract class AbstractCentAlgorithm extends AbstractAlgorithm {
             return unit;
         }
 
-        protected GraphTraversal<Vertex, Vertex> filterNonShortestPath(
-                                                 GraphTraversal<Vertex, Vertex>
-                                                 t) {
-            long size = this.graph().traversal().V().limit(MAX_QUERY_LIMIT)
+        protected <V> GraphTraversal<V, V> filterNonShortestPath(
+                                           GraphTraversal<V, V> t,
+                                           boolean keepOneShortestPath) {
+            long size = this.graph().traversal().V().limit(100000L)
                                                     .count().next();
             Map<Pair<Id, Id>, Integer> triples = new HashMap<>((int) size);
             return t.filter(it -> {
@@ -142,15 +145,32 @@ public abstract class AbstractCentAlgorithm extends AbstractAlgorithm {
                 int len = it.<List<?>>path(Pop.all, "v").size();
                 Pair<Id, Id> key = Pair.of(start, end);
                 Integer shortest = triples.get(key);
-                if (shortest != null && shortest != len) {
+                if (shortest != null && len > shortest) {
                     // ignore non shortest path
                     return false;
                 }
                 if (shortest == null) {
                     triples.put(key, len);
+                } else {
+                    assert len == shortest;
+                    if (keepOneShortestPath) {
+                        return false;
+                    }
                 }
                 return true;
             });
         }
+
+        protected GraphTraversal<Vertex, ?> topN(GraphTraversal<Vertex, ?> t,
+                                                 long topN) {
+            if (topN > 0L || topN == NO_LIMIT) {
+                t = t.order(Scope.local).by(Column.values, Order.desc);
+                if (topN > 0L) {
+                    assert topN != NO_LIMIT;
+                    t = t.limit(Scope.local, topN);
+                }
+            }
+            return t;
+        }
     }
 }
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java
index 370299163..40b38f655 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java
@@ -21,13 +21,10 @@ package com.baidu.hugegraph.job.algorithm.cent;
 
 import java.util.Map;
 
-import org.apache.tinkerpop.gremlin.process.traversal.Order;
 import org.apache.tinkerpop.gremlin.process.traversal.P;
 import org.apache.tinkerpop.gremlin.process.traversal.Pop;
-import org.apache.tinkerpop.gremlin.process.traversal.Scope;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
-import org.apache.tinkerpop.gremlin.structure.Column;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 
 import com.baidu.hugegraph.job.Job;
@@ -72,7 +69,7 @@ public class BetweenessCentralityAlgorithm extends AbstractCentAlgorithm {
                                            long topN) {
             assert depth > 0;
             assert degree > 0L || degree == NO_LIMIT;
-            assert topN >= 0L;
+            assert topN >= 0L || topN == NO_LIMIT;
 
             GraphTraversal<Vertex, Vertex> t = constructSource(sourceLabel,
                                                                sourceSample,
@@ -80,14 +77,11 @@ public class BetweenessCentralityAlgorithm extends AbstractCentAlgorithm {
             t = constructPath(t, direction, label, degree, sample,
                               sourceLabel, sourceCLabel);
             t = t.emit().until(__.loops().is(P.gte(depth)));
-            t = filterNonShortestPath(t);
+            t = filterNonShortestPath(t, false);
 
             GraphTraversal<Vertex, ?> tg = t.select(Pop.all, "v")
-                                            .unfold().id()
-                                            .groupCount().order(Scope.local)
-                                            .by(Column.values, Order.desc);
-            GraphTraversal<Vertex, ?> tLimit = topN <= 0L ? tg :
-                                               tg.limit(Scope.local, topN);
+                                            .unfold().id().groupCount();
+            GraphTraversal<Vertex, ?> tLimit = topN(tg, topN);
 
             return this.execute(tLimit, () -> tLimit.next());
         }
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java
index 56d61504a..9a25b6394 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java
@@ -22,13 +22,11 @@ package com.baidu.hugegraph.job.algorithm.cent;
 import java.util.Map;
 
 import org.apache.tinkerpop.gremlin.process.traversal.Operator;
-import org.apache.tinkerpop.gremlin.process.traversal.Order;
 import org.apache.tinkerpop.gremlin.process.traversal.P;
 import org.apache.tinkerpop.gremlin.process.traversal.Pop;
 import org.apache.tinkerpop.gremlin.process.traversal.Scope;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
-import org.apache.tinkerpop.gremlin.structure.Column;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 
 import com.baidu.hugegraph.job.Job;
@@ -81,7 +79,7 @@ public class ClosenessCentralityAlgorithm extends AbstractCentAlgorithm {
                                           long topN) {
             assert depth > 0;
             assert degree > 0L || degree == NO_LIMIT;
-            assert topN >= 0L;
+            assert topN >= 0L || topN == NO_LIMIT;
 
             GraphTraversal<Vertex, Vertex> t = constructSource(sourceLabel,
                                                                sourceSample,
@@ -89,15 +87,13 @@ public class ClosenessCentralityAlgorithm extends AbstractCentAlgorithm {
             t = constructPath(t, direction, label, degree, sample,
                               sourceLabel, sourceCLabel);
             t = t.emit().until(__.loops().is(P.gte(depth)));
-            t = filterNonShortestPath(t);
+            t = filterNonShortestPath(t, true);
 
             GraphTraversal<Vertex, ?> tg;
             tg = t.group().by(__.select(Pop.first, "v").id())
                           .by(__.select(Pop.all, "v").count(Scope.local)
-                                .sack(Operator.div).sack().sum())
-                          .order(Scope.local).by(Column.values, Order.desc);
-            GraphTraversal<Vertex, ?> tLimit = topN <= 0L ? tg :
-                                               tg.limit(Scope.local, topN);
+                                .sack(Operator.div).sack().sum());
+            GraphTraversal<Vertex, ?> tLimit = topN(tg, topN);
 
             return this.execute(tLimit, () -> tLimit.next());
         }
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java
index f29a6301d..01b3e5c4b 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java
@@ -64,10 +64,10 @@ public class DegreeCentralityAlgorithm extends AbstractCentAlgorithm {
                                        String label,
                                        long topN) {
             if (direction == null || direction == Directions.BOTH) {
-                return degreeCentrality(label, topN);
+                return this.degreeCentralityForBothDir(label, topN);
             }
             assert direction == Directions.OUT || direction == Directions.IN;
-            assert topN >= 0L;
+            assert topN >= 0L || topN == NO_LIMIT;
 
             Iterator<Edge> edges = this.edges(direction);
 
@@ -76,12 +76,12 @@ public class DegreeCentralityAlgorithm extends AbstractCentAlgorithm {
             Id vertex = null;
             Id labelId = this.getEdgeLabelId(label);
             long degree = 0L;
-            long total = 0L;
+            long totalEdges = 0L;
 
             degrees.startObject();
             while (edges.hasNext()) {
                 HugeEdge edge = (HugeEdge) edges.next();
-                this.updateProgress(++total);
+                this.updateProgress(++totalEdges);
 
                 Id schemaLabel = edge.schemaLabel().id();
                 if (labelId != null && !labelId.equals(schemaLabel)) {
@@ -97,7 +97,7 @@ public class DegreeCentralityAlgorithm extends AbstractCentAlgorithm {
 
                 if (vertex != null) {
                     // next vertex found
-                    if (topN <= 0L) {
+                    if (topN <= 0L && topN != NO_LIMIT) {
                         degrees.append(vertex, degree);
                     } else {
                         tops.put(vertex, degree);
@@ -108,7 +108,7 @@ public class DegreeCentralityAlgorithm extends AbstractCentAlgorithm {
             }
 
             if (vertex != null) {
-                if (topN <= 0L) {
+                if (topN <= 0L && topN != NO_LIMIT) {
                     degrees.append(vertex, degree);
                 } else {
                     tops.put(vertex, degree);
@@ -121,9 +121,9 @@ public class DegreeCentralityAlgorithm extends AbstractCentAlgorithm {
             return degrees.asJson();
         }
 
-        protected Object degreeCentrality(String label, long topN) {
-            assert topN >= 0L;
-            long total = 0L;
+        protected Object degreeCentralityForBothDir(String label, long topN) {
+            assert topN >= 0L || topN == NO_LIMIT;
+            long totalVertices = 0L;
             JsonMap degrees = new JsonMap();
             TopMap<Id> tops = new TopMap<>(topN);
 
@@ -132,11 +132,11 @@ public class DegreeCentralityAlgorithm extends AbstractCentAlgorithm {
             degrees.startObject();
             while (vertices.hasNext()) {
                 Id source = (Id) vertices.next().id();
-                this.updateProgress(++total);
+                this.updateProgress(++totalVertices);
 
                 long degree = this.degree(source, label);
                 if (degree > 0L) {
-                    if (topN <= 0L) {
+                    if (topN <= 0L && topN != NO_LIMIT) {
                         degrees.append(source, degree);
                     } else {
                         tops.put(source, degree);
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java
index ec065fa07..0f695a1fb 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java
@@ -21,11 +21,8 @@ package com.baidu.hugegraph.job.algorithm.cent;
 
 import java.util.Map;
 
-import org.apache.tinkerpop.gremlin.process.traversal.Order;
-import org.apache.tinkerpop.gremlin.process.traversal.Scope;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
-import org.apache.tinkerpop.gremlin.structure.Column;
 import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 
@@ -74,7 +71,7 @@ public class EigenvectorCentralityAlgorithm extends AbstractCentAlgorithm {
                                             long topN) {
             assert depth > 0;
             assert degree > 0L || degree == NO_LIMIT;
-            assert topN >= 0L;
+            assert topN >= 0L || topN == NO_LIMIT;
 
             // TODO: support parameters: Directions dir, String label
             /*
@@ -96,10 +93,8 @@ public class EigenvectorCentralityAlgorithm extends AbstractCentAlgorithm {
             t = t.repeat(__.groupCount("m").by(T.id)
                            .local(unit).simplePath()).times(depth);
 
-            GraphTraversal<Vertex, Object> tCap;
-            tCap = t.cap("m").order(Scope.local).by(Column.values, Order.desc);
-            GraphTraversal<Vertex, ?> tLimit = topN <= 0L ? tCap :
-                                               tCap.limit(Scope.local, topN);
+            GraphTraversal<Vertex, Object> tCap = t.cap("m");
+            GraphTraversal<Vertex, ?> tLimit = topN(tCap, topN);
 
             return this.execute(tLimit, () -> tLimit.next());
         }
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java
index 7ac30cd2d..3f3a26c3c 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java
@@ -23,33 +23,48 @@ import java.util.Map;
 
 import com.baidu.hugegraph.job.Job;
 import com.baidu.hugegraph.type.define.Directions;
+import com.baidu.hugegraph.util.E;
 import com.baidu.hugegraph.util.InsertionOrderUtil;
 
 public class ClusterCoeffcientAlgorithm extends AbstractCommAlgorithm {
 
+    public static final String ALGO_NAME = "cluster_coeffcient";
+
     @Override
     public String name() {
-        return "cluster_coeffcient";
+        return ALGO_NAME;
     }
 
     @Override
     public void checkParameters(Map<String, Object> parameters) {
-        directionOutIn(parameters);
+        direction(parameters);
         degree(parameters);
+        workersWhenBoth(parameters);
     }
 
     @Override
     public Object call(Job<Object> job, Map<String, Object> parameters) {
-        try (Traverser traverser = new Traverser(job)) {
-            return traverser.clusterCoeffcient(directionOutIn(parameters),
+        int workers = workersWhenBoth(parameters);
+        try (Traverser traverser = new Traverser(job, workers)) {
+            return traverser.clusterCoeffcient(direction(parameters),
                                                degree(parameters));
         }
     }
 
+    protected static int workersWhenBoth(Map<String, Object> parameters) {
+        Directions direction = direction(parameters);
+        int workers = workers(parameters);
+        E.checkArgument(direction == Directions.BOTH || workers <= 0,
+                        "The workers must be not set when direction!=BOTH, " +
+                        "but got workers=%s and direction=%s",
+                        workers, direction);
+        return workers;
+    }
+
     private static class Traverser extends TriangleCountAlgorithm.Traverser {
 
-        public Traverser(Job<Object> job) {
-            super(job);
+        public Traverser(Job<Object> job, int workers) {
+            super(job, ALGO_NAME, workers);
         }
 
         public Object clusterCoeffcient(Directions direction, long degree) {
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java
index 923c1a2a3..52ddeeb71 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java
@@ -44,6 +44,8 @@ import com.google.common.collect.ImmutableSet;
 
 public class KCoreAlgorithm extends AbstractCommAlgorithm {
 
+    public static final String ALGO_NAME = "k_core";
+
     public static final String KEY_K = "k";
     public static final String KEY_MERGED = "merged";
 
@@ -51,7 +53,7 @@ public class KCoreAlgorithm extends AbstractCommAlgorithm {
 
     @Override
     public String name() {
-        return "k_core";
+        return ALGO_NAME;
     }
 
     @Override
@@ -101,7 +103,7 @@ public class KCoreAlgorithm extends AbstractCommAlgorithm {
     private static class Traverser extends AlgoTraverser {
 
         public Traverser(Job<Object> job, int workers) {
-            super(job, "kcore", workers);
+            super(job, ALGO_NAME, workers);
         }
 
         public Object kcore(String sourceLabel, String sourceCLabel,
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java
index 446ab2686..8eee9f43e 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java
@@ -26,9 +26,11 @@ import com.baidu.hugegraph.traversal.algorithm.HugeTraverser;
 
 public class LouvainAlgorithm extends AbstractCommAlgorithm {
 
+    public static final String ALGO_NAME = "louvain";
+
     @Override
     public String name() {
-        return "louvain";
+        return ALGO_NAME;
     }
 
     @Override
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java
index 5d7548aa3..6135d1d40 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java
@@ -85,7 +85,7 @@ public class LouvainTraverser extends AlgoTraverser {
 
     public LouvainTraverser(Job<Object> job, int workers, long degree,
                             String sourceLabel, String sourceCLabel) {
-        super(job, "louvain", workers);
+        super(job, LouvainAlgorithm.ALGO_NAME, workers);
         this.g = this.graph().traversal();
         this.sourceLabel = sourceLabel;
         this.sourceCLabel = sourceCLabel;
@@ -422,7 +422,7 @@ public class LouvainTraverser extends AlgoTraverser {
             }
         });
 
-        consumers.start();
+        consumers.start("louvain-move-pass-" + pass);
         try {
             while (vertices.hasNext()) {
                 this.updateProgress(++this.progress);
@@ -460,7 +460,7 @@ public class LouvainTraverser extends AlgoTraverser {
             this.graph().tx().commit();
         });
 
-        consumers.start();
+        consumers.start("louvain-merge-pass-" + pass);
         try {
             for (Pair<Community, Set<Id>> pair : comms) {
                 Community c = pair.getLeft();
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java
index e15665cfc..0f3506a15 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java
@@ -41,9 +41,11 @@ import com.google.common.collect.ImmutableMap;
 
 public class LpaAlgorithm extends AbstractCommAlgorithm {
 
+    public static final String ALGO_NAME = "lpa";
+
     @Override
     public String name() {
-        return "lpa";
+        return ALGO_NAME;
     }
 
     @Override
@@ -87,7 +89,7 @@ public class LpaAlgorithm extends AbstractCommAlgorithm {
         private final Random R = new Random();
 
         public Traverser(Job<Object> job, int workers) {
-            super(job, "lpa", workers);
+            super(job, ALGO_NAME, workers);
         }
 
         public Object lpa(String sourceLabel, String edgeLabel,
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java
index 0adb4707c..d8a17653c 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java
@@ -19,48 +19,70 @@
 
 package com.baidu.hugegraph.job.algorithm.comm;
 
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 
 import com.baidu.hugegraph.backend.id.Id;
+import com.baidu.hugegraph.backend.id.IdGenerator;
 import com.baidu.hugegraph.job.Job;
 import com.baidu.hugegraph.structure.HugeEdge;
 import com.baidu.hugegraph.type.define.Directions;
+import com.baidu.hugegraph.util.E;
 import com.baidu.hugegraph.util.InsertionOrderUtil;
 import com.google.common.collect.ImmutableMap;
 
 public class TriangleCountAlgorithm extends AbstractCommAlgorithm {
 
+    public static final String ALGO_NAME = "triangle_count";
+
     @Override
     public String name() {
-        return "triangle_count";
+        return ALGO_NAME;
     }
 
     @Override
     public void checkParameters(Map<String, Object> parameters) {
-        directionOutIn(parameters);
+        direction4Out(parameters);
         degree(parameters);
+        workersWhenBoth(parameters);
     }
 
     @Override
     public Object call(Job<Object> job, Map<String, Object> parameters) {
-        try (Traverser traverser = new Traverser(job)) {
-            return traverser.triangleCount(directionOutIn(parameters),
+        int workers = workersWhenBoth(parameters);
+        try (Traverser traverser = new Traverser(job, workers)) {
+            return traverser.triangleCount(direction4Out(parameters),
                                            degree(parameters));
         }
     }
 
+    protected static int workersWhenBoth(Map<String, Object> parameters) {
+        Directions direction = direction4Out(parameters);
+        int workers = workers(parameters);
+        E.checkArgument(direction == Directions.BOTH || workers <= 0,
+                        "The workers must be not set when direction!=BOTH, " +
+                        "but got workers=%s and direction=%s",
+                        workers, direction);
+        return workers;
+    }
+
     protected static class Traverser extends AlgoTraverser {
 
         protected static final String KEY_TRIANGLES = "triangles";
         protected static final String KEY_TRIADS = "triads";
 
-        public Traverser(Job<Object> job) {
-            super(job);
+        public Traverser(Job<Object> job, int workers) {
+            super(job, ALGO_NAME, workers);
+        }
+
+        protected Traverser(Job<Object> job, String name, int workers) {
+            super(job, name, workers);
         }
 
         public Object triangleCount(Directions direction, long degree) {
@@ -73,22 +95,23 @@ public class TriangleCountAlgorithm extends AbstractCommAlgorithm {
         protected Map<String, Long> triangles(Directions direction,
                                               long degree) {
             if (direction == null || direction == Directions.BOTH) {
-                throw new IllegalArgumentException("Direction must be OUT/IN");
+                return this.trianglesForBothDir(degree);
             }
+
             assert direction == Directions.OUT || direction == Directions.IN;
 
             Iterator<Edge> edges = this.edges(direction);
 
             long triangles = 0L;
             long triads = 0L;
-            long total = 0L;
+            long totalEdges = 0L;
             long totalVertices = 0L;
             Id vertex = null;
 
-            Set<Id> adjVertices = new HashSet<>();
+            Set<Id> adjVertices = newOrderedSet();
             while (edges.hasNext()) {
                 HugeEdge edge = (HugeEdge) edges.next();
-                this.updateProgress(++total);
+                this.updateProgress(++totalEdges);
 
                 Id source = edge.ownerVertex().id();
                 Id target = edge.otherVertex().id();
@@ -108,37 +131,97 @@ public class TriangleCountAlgorithm extends AbstractCommAlgorithm {
                      *      B -> [D,F]
                      *      E -> [B,C,F]
                      */
-                    triangles += this.intersect(direction, degree, adjVertices);
+                    triangles += this.intersect(degree, adjVertices);
                     triads += this.localTriads(adjVertices.size());
                     totalVertices++;
                     // Reset for the next source
-                    adjVertices = new HashSet<>();
+                    adjVertices = newOrderedSet();
                 }
                 vertex = source;
                 adjVertices.add(target);
             }
 
             if (vertex != null) {
-                triangles += this.intersect(direction, degree, adjVertices);
+                triangles += this.intersect(degree, adjVertices);
                 triads += this.localTriads(adjVertices.size());
                 totalVertices++;
             }
 
             String suffix = "_" + direction.string();
-            return ImmutableMap.of("edges" + suffix, total,
+            return ImmutableMap.of("edges" + suffix, totalEdges,
                                    "vertices" + suffix, totalVertices,
                                    KEY_TRIANGLES, triangles,
                                    KEY_TRIADS, triads);
         }
 
-        protected long intersect(Directions dir, long degree,
-                                 Set<Id> adjVertices) {
+        protected Map<String, Long> trianglesForBothDir(long degree) {
+            AtomicLong triangles = new AtomicLong(0L);
+            AtomicLong triads = new AtomicLong(0L);
+            AtomicLong totalEdges = new AtomicLong(0L);
+            AtomicLong totalVertices = new AtomicLong(0L);
+
+            this.traverse(null, null, v -> {
+                Id source = (Id) v.id();
+
+                MutableLong edgesCount = new MutableLong(0L);
+                Set<Id> adjVertices = this.adjacentVertices(source, degree,
+                                                            edgesCount);
+
+                triangles.addAndGet(this.intersect(degree, adjVertices));
+                triads.addAndGet(this.localTriads(adjVertices.size()));
+
+                totalVertices.incrementAndGet();
+                totalEdges.addAndGet(edgesCount.longValue());
+            });
+
+            assert totalEdges.get() % 2L == 0L;
+            assert triangles.get() % 3L == 0L;
+            // totalEdges /= 2L
+            totalEdges.getAndAccumulate(2L, (l, w) -> l / w);
+            // triangles /= 3L
+            triangles.getAndAccumulate(3L, (l, w) -> l / w);
+            // triads -= triangles * 2L
+            triads.addAndGet(triangles.get() * -2L);
+
+            return ImmutableMap.of("edges", totalEdges.get(),
+                                   "vertices", totalVertices.get(),
+                                   KEY_TRIANGLES, triangles.get(),
+                                   KEY_TRIADS, triads.get());
+        }
+
+        private Set<Id> adjacentVertices(Id source, long degree,
+                                         MutableLong edgesCount) {
+            Iterator<Id> adjVertices = this.adjacentVertices(source,
+                                                             Directions.BOTH,
+                                                             null, degree);
+            Set<Id> set = newOrderedSet();
+            while (adjVertices.hasNext()) {
+                edgesCount.increment();
+                set.add(adjVertices.next());
+            }
+            return set;
+        }
+
+        protected long intersect(long degree, Set<Id> adjVertices) {
             long count = 0L;
+            Directions dir = Directions.OUT;
+            Id empty = IdGenerator.of(0);
             Iterator<Id> vertices;
             for (Id v : adjVertices) {
                 vertices = this.adjacentVertices(v, dir, null, degree);
+                Id lastVertex = empty;
                 while (vertices.hasNext()) {
                     Id vertex = vertices.next();
+                    if (lastVertex.equals(vertex)) {
+                        // Skip duplicated target vertex (through sortkeys)
+                        continue;
+                    }
+                    lastVertex = vertex;
+
+                    /*
+                     * FIXME: deduplicate two edges with opposite directions
+                     * between two specified adjacent vertices
+                     */
                     if (adjVertices.contains(vertex)) {
                         count++;
                     }
@@ -150,5 +233,9 @@ public class TriangleCountAlgorithm extends AbstractCommAlgorithm {
         protected long localTriads(int size) {
             return size * (size - 1L) / 2L;
         }
+
+        protected static <V> Set<V> newOrderedSet() {
+            return new TreeSet<>();
+        }
     }
 }
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java
index 3d5bd163e..bbb028efc 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java
@@ -20,34 +20,37 @@
 package com.baidu.hugegraph.job.algorithm.path;
 
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.baidu.hugegraph.backend.id.Id;
 import com.baidu.hugegraph.job.Job;
 import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm;
+import com.baidu.hugegraph.job.algorithm.Consumers.StopExecution;
 import com.baidu.hugegraph.traversal.algorithm.SubGraphTraverser;
 import com.baidu.hugegraph.type.define.Directions;
 import com.baidu.hugegraph.util.JsonUtil;
 
 public class RingsDetectAlgorithm extends AbstractAlgorithm {
 
+    public static final String ALGO_NAME = "rings";
+
     public static final String KEY_COUNT_ONLY = "count_only";
 
     @Override
-    public String name() {
-        return "rings_detect";
+    public String category() {
+        return CATEGORY_PATH;
     }
 
     @Override
-    public String category() {
-        return CATEGORY_PATH;
+    public String name() {
+        return ALGO_NAME;
     }
 
     @Override
     public void checkParameters(Map<String, Object> parameters) {
         depth(parameters);
         degree(parameters);
-        capacity(parameters);
+        eachLimit(parameters);
         limit(parameters);
         sourceLabel(parameters);
         sourceCLabel(parameters);
@@ -67,13 +70,13 @@ public class RingsDetectAlgorithm extends AbstractAlgorithm {
                                    edgeLabel(parameters),
                                    depth(parameters),
                                    degree(parameters),
-                                   capacity(parameters),
+                                   eachLimit(parameters),
                                    limit(parameters),
                                    countOnly(parameters));
         }
     }
 
-    public boolean countOnly(Map<String, Object> parameters) {
+    protected boolean countOnly(Map<String, Object> parameters) {
         if (!parameters.containsKey(KEY_COUNT_ONLY)) {
             return false;
         }
@@ -83,12 +86,12 @@ public class RingsDetectAlgorithm extends AbstractAlgorithm {
     private static class Traverser extends AlgoTraverser {
 
         public Traverser(Job<Object> job, int workers) {
-            super(job, "ring", workers);
+            super(job, ALGO_NAME, workers);
         }
 
         public Object rings(String sourceLabel, String sourceCLabel,
                             Directions dir, String label, int depth,
-                            long degree, long capacity, long limit,
+                            long degree, long eachLimit, long limit,
                             boolean countOnly) {
             JsonMap ringsJson = new JsonMap();
             ringsJson.startObject();
@@ -100,24 +103,24 @@ public class RingsDetectAlgorithm extends AbstractAlgorithm {
             }
 
             SubGraphTraverser traverser = new SubGraphTraverser(this.graph());
-            AtomicInteger count = new AtomicInteger(0);
+            AtomicLong count = new AtomicLong(0L);
 
             this.traverse(sourceLabel, sourceCLabel, v -> {
                 Id source = (Id) v.id();
                 PathSet rings = traverser.rings(source, dir, label, depth,
-                                                true, degree, capacity, limit);
+                                                true, degree, MAX_CAPACITY,
+                                                eachLimit);
+                assert eachLimit == NO_LIMIT || rings.size() <= eachLimit;
                 for (Path ring : rings) {
-                    Id min = null;
-                    for (Id id : ring.vertices()) {
-                        if (min == null || id.compareTo(min) < 0) {
-                            min = id;
-                        }
+                    if (eachLimit == NO_LIMIT && !ring.ownedBy(source)) {
+                        // Only dedup rings when each_limit!=NO_LIMIT
+                        continue;
                     }
-                    if (source.equals(min)) {
-                        if (countOnly) {
-                            count.incrementAndGet();
-                            continue;
-                        }
+
+                    if (count.incrementAndGet() > limit && limit != NO_LIMIT) {
+                        throw new StopExecution("exceed limit %s", limit);
+                    }
+                    if (!countOnly) {
                         String ringJson = JsonUtil.toJson(ring.vertices());
                         synchronized (ringsJson) {
                             ringsJson.appendRaw(ringJson);
@@ -125,8 +128,14 @@ public class RingsDetectAlgorithm extends AbstractAlgorithm {
                     }
                 }
             });
+
             if (countOnly) {
-                ringsJson.append(count.get());
+                long counted = count.get();
+                if (limit != NO_LIMIT && counted > limit) {
+                    // The count increased by multi threads and exceed limit
+                    counted = limit;
+                }
+                ringsJson.append(counted);
             } else {
                 ringsJson.endList();
             }
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java
index 82294f48b..fbaca4960 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java
@@ -20,12 +20,14 @@
 package com.baidu.hugegraph.job.algorithm.similarity;
 
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import com.baidu.hugegraph.HugeGraph;
 import com.baidu.hugegraph.job.Job;
 import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm;
+import com.baidu.hugegraph.job.algorithm.Consumers.StopExecution;
 import com.baidu.hugegraph.schema.EdgeLabel;
 import com.baidu.hugegraph.traversal.algorithm.FusiformSimilarityTraverser;
 import com.baidu.hugegraph.traversal.algorithm.FusiformSimilarityTraverser.SimilarsMap;
@@ -35,24 +37,27 @@ import com.baidu.hugegraph.util.JsonUtil;
 
 public class FusiformSimilarityAlgorithm extends AbstractAlgorithm {
 
+    public static final String ALGO_NAME = "fusiform_similarity";
+
     public static final String KEY_MIN_NEIGHBORS = "min_neighbors";
     public static final String KEY_MIN_SIMILARS = "min_similars";
+    public static final String KEY_TOP_SIMILARS = "top_similars";
     public static final String KEY_GROUP_PROPERTY = "group_property";
     public static final String KEY_MIN_GROUPS = "min_groups";
 
     public static final int DEFAULT_MIN_NEIGHBORS = 10;
     public static final int DEFAULT_MIN_SIMILARS = 6;
+    public static final int DEFAULT_TOP_SIMILARS = 0;
     public static final int DEFAULT_MIN_GROUPS = 1;
-    public static final long DEFAULT_LIMIT = -1L;
 
     @Override
-    public String name() {
-        return "fusiform_similarity";
+    public String category() {
+        return CATEGORY_SIMI;
     }
 
     @Override
-    public String category() {
-        return CATEGORY_SIMI;
+    public String name() {
+        return ALGO_NAME;
     }
 
     @Override
@@ -60,11 +65,10 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm {
         minNeighbors(parameters);
         alpha(parameters);
         minSimilars(parameters);
-        top(parameters);
+        topSimilars(parameters);
         groupProperty(parameters);
         minGroups(parameters);
         degree(parameters);
-        capacity(parameters);
         limit(parameters);
         sourceLabel(parameters);
         sourceCLabel(parameters);
@@ -84,11 +88,10 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm {
                                               minNeighbors(parameters),
                                               alpha(parameters),
                                               minSimilars(parameters),
-                                              top(parameters),
+                                              topSimilars(parameters),
                                               groupProperty(parameters),
                                               minGroups(parameters),
                                               degree(parameters),
-                                              capacity(parameters),
                                               limit(parameters));
         }
     }
@@ -98,7 +101,7 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm {
             return DEFAULT_MIN_NEIGHBORS;
         }
         int minNeighbors = parameterInt(parameters, KEY_MIN_NEIGHBORS);
-        HugeTraverser.checkPositive(minNeighbors, "min neighbors");
+        HugeTraverser.checkPositive(minNeighbors, KEY_MIN_NEIGHBORS);
         return minNeighbors;
     }
 
@@ -107,7 +110,16 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm {
             return DEFAULT_MIN_SIMILARS;
         }
         int minSimilars = parameterInt(parameters, KEY_MIN_SIMILARS);
-        HugeTraverser.checkPositive(minSimilars, "min similars");
+        HugeTraverser.checkPositive(minSimilars, KEY_MIN_SIMILARS);
+        return minSimilars;
+    }
+
+    protected static int topSimilars(Map<String, Object> parameters) {
+        if (!parameters.containsKey(KEY_TOP_SIMILARS)) {
+            return DEFAULT_TOP_SIMILARS;
+        }
+        int minSimilars = parameterInt(parameters, KEY_TOP_SIMILARS);
+        HugeTraverser.checkNonNegative(minSimilars, KEY_TOP_SIMILARS);
         return minSimilars;
     }
 
@@ -123,7 +135,7 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm {
             return DEFAULT_MIN_GROUPS;
         }
         int minGroups = parameterInt(parameters, KEY_MIN_GROUPS);
-        HugeTraverser.checkPositive(minGroups, "min groups");
+        HugeTraverser.checkPositive(minGroups, KEY_MIN_GROUPS);
         return minGroups;
     }
 
@@ -139,7 +151,7 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm {
     private static class Traverser extends AlgoTraverser {
 
         public Traverser(Job<Object> job, int workers) {
-            super(job, "fusiform", workers);
+            super(job, ALGO_NAME, workers);
         }
 
         public Object fusiformSimilars(String sourceLabel, String sourceCLabel,
@@ -147,12 +159,14 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm {
                                        int minNeighbors, double alpha,
                                        int minSimilars, long topSimilars,
                                        String groupProperty, int minGroups,
-                                       long degree, long capacity, long limit) {
+                                       long degree, long limit) {
             HugeGraph graph = this.graph();
             EdgeLabel edgeLabel = label == null ? null : graph.edgeLabel(label);
 
             FusiformSimilarityTraverser traverser =
                                         new FusiformSimilarityTraverser(graph);
+
+            AtomicLong count = new AtomicLong(0L);
             JsonMap similarsJson = new JsonMap();
             similarsJson.startObject();
 
@@ -162,16 +176,20 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm {
                                        edgeLabel, minNeighbors, alpha,
                                        minSimilars, (int) topSimilars,
                                        groupProperty, minGroups, degree,
-                                       capacity, NO_LIMIT, true);
+                                       MAX_CAPACITY, NO_LIMIT, true);
                 if (similars.isEmpty()) {
                     return;
                 }
                 String result = JsonUtil.toJson(similars.toMap());
                 result = result.substring(1, result.length() - 1);
                 synchronized (similarsJson) {
+                    if (count.incrementAndGet() > limit && limit != NO_LIMIT) {
+                        throw new StopExecution("exceed limit %s", limit);
+                    }
                     similarsJson.appendRaw(result);
                 }
-            }, null, limit);
+            });
+
             similarsJson.endObject();
 
             return similarsJson.asJson();