You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by ji...@apache.org on 2022/11/09 10:25:18 UTC
[incubator-hugegraph] 15/33: add BOTH direction support for triangle_count/cluster_coeffcient (#24)
This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit 53025bd43bfe537a033597cea51720620089c25b
Author: Jermy Li <li...@baidu.com>
AuthorDate: Wed Jun 24 17:58:34 2020 +0800
add BOTH direction support for triangle_count/cluster_coeffcient (#24)
change log:
1. add BOTH direction support for triangle_count/cluster_coeffcientith.
2. fix extra triangle count with multi edges between tow adjacent vertices.
3. set default value of direction to BOTH for degree_centrality and cluster_coeffcient .
4. add workers for triangle_count and cluster_coeffcient.
5. fix closeness: multi shortest paths cause results illogical.
6. rename rings_detect to rings, rename limit to each_limit which means limit number of rings of each source vertex, and don't do dedup if passed each_limit > 0.
7. unify top for 4 centrality algos: sorted results when top = -1, unsorted results when top = 0.
8. fusiform: rename top to top_similars (expected >= 0).
9. fusiform/rings: add limit param which means max number of results, and remove capacity param and hardcode to 100000000.
Change-Id: I9ddf8553e6d86b99adbff8b972890d69d623fa1a
---
.../hugegraph/job/algorithm/AbstractAlgorithm.java | 52 ++++++---
.../baidu/hugegraph/job/algorithm/Consumers.java | 25 ++++-
.../job/algorithm/SubgraphStatAlgorithm.java | 5 +
.../job/algorithm/cent/AbstractCentAlgorithm.java | 30 ++++-
.../cent/BetweenessCentralityAlgorithm.java | 14 +--
.../cent/ClosenessCentralityAlgorithm.java | 12 +-
.../algorithm/cent/DegreeCentralityAlgorithm.java | 22 ++--
.../cent/EigenvectorCentralityAlgorithm.java | 11 +-
.../algorithm/comm/ClusterCoeffcientAlgorithm.java | 27 ++++-
.../job/algorithm/comm/KCoreAlgorithm.java | 6 +-
.../job/algorithm/comm/LouvainAlgorithm.java | 4 +-
.../job/algorithm/comm/LouvainTraverser.java | 6 +-
.../hugegraph/job/algorithm/comm/LpaAlgorithm.java | 6 +-
.../job/algorithm/comm/TriangleCountAlgorithm.java | 121 ++++++++++++++++++---
.../job/algorithm/path/RingsDetectAlgorithm.java | 55 ++++++----
.../similarity/FusiformSimilarityAlgorithm.java | 50 ++++++---
16 files changed, 315 insertions(+), 131 deletions(-)
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
index 67d508a47..e30ce1e16 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
@@ -36,6 +36,7 @@ import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.backend.id.Id;
@@ -44,6 +45,7 @@ import com.baidu.hugegraph.backend.query.Query;
import com.baidu.hugegraph.iterator.FilterIterator;
import com.baidu.hugegraph.iterator.FlatMapperIterator;
import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.job.algorithm.Consumers.StopExecution;
import com.baidu.hugegraph.testutil.Whitebox;
import com.baidu.hugegraph.traversal.algorithm.HugeTraverser;
import com.baidu.hugegraph.type.HugeType;
@@ -61,6 +63,7 @@ public abstract class AbstractAlgorithm implements Algorithm {
public static final long MAX_RESULT_SIZE = 100L * Bytes.MB;
public static final long MAX_QUERY_LIMIT = 100000000L; // about 100GB
+ public static final long MAX_CAPACITY = MAX_QUERY_LIMIT;
public static final int BATCH = 500;
public static final String CATEGORY_AGGR = "aggregate";
@@ -87,11 +90,13 @@ public abstract class AbstractAlgorithm implements Algorithm {
public static final String KEY_CLEAR = "clear";
public static final String KEY_CAPACITY = "capacity";
public static final String KEY_LIMIT = "limit";
+ public static final String KEY_EACH_LIMIT = "each_limit";
public static final String KEY_ALPHA = "alpha";
public static final String KEY_WORKERS = "workers";
public static final long DEFAULT_CAPACITY = 10000000L;
public static final long DEFAULT_LIMIT = 100L;
+ public static final long DEFAULT_EACH_LIMIT = 1L;
public static final long DEFAULT_DEGREE = 100L;
public static final long DEFAULT_SAMPLE = 1L;
public static final long DEFAULT_TIMES = 20L;
@@ -131,6 +136,14 @@ public abstract class AbstractAlgorithm implements Algorithm {
return parseDirection(direction);
}
+ protected static Directions direction4Out(Map<String, Object> parameters) {
+ if (!parameters.containsKey(KEY_DIRECTION)) {
+ return Directions.OUT;
+ }
+ Object direction = parameter(parameters, KEY_DIRECTION);
+ return parseDirection(direction);
+ }
+
protected static Directions directionOutIn(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_DIRECTION)) {
return Directions.OUT;
@@ -148,14 +161,10 @@ public abstract class AbstractAlgorithm implements Algorithm {
return DEFAULT_ALPHA;
}
double alpha = parameterDouble(parameters, KEY_ALPHA);
- checkAlpha(alpha);
- return alpha;
- }
-
- public static void checkAlpha(double alpha) {
- E.checkArgument(alpha > 0 && alpha <= 1.0,
+ E.checkArgument(alpha > 0.0 && alpha <= 1.0,
"The value of %s must be in range (0, 1], but got %s",
KEY_ALPHA, alpha);
+ return alpha;
}
protected static long top(Map<String, Object> parameters) {
@@ -163,9 +172,7 @@ public abstract class AbstractAlgorithm implements Algorithm {
return 0L;
}
long top = parameterLong(parameters, KEY_TOP);
- E.checkArgument(top >= 0L,
- "The value of %s must be >= 0, but got %s",
- KEY_TOP, top);
+ HugeTraverser.checkNonNegativeOrNoLimit(top, KEY_TOP);
return top;
}
@@ -196,6 +203,15 @@ public abstract class AbstractAlgorithm implements Algorithm {
return limit;
}
+ protected static long eachLimit(Map<String, Object> parameters) {
+ if (!parameters.containsKey(KEY_EACH_LIMIT)) {
+ return DEFAULT_EACH_LIMIT;
+ }
+ long limit = parameterLong(parameters, KEY_EACH_LIMIT);
+ HugeTraverser.checkPositiveOrNoLimit(limit, KEY_EACH_LIMIT);
+ return limit;
+ }
+
protected static long sample(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_SAMPLE)) {
return DEFAULT_SAMPLE;
@@ -355,21 +371,24 @@ public abstract class AbstractAlgorithm implements Algorithm {
Consumers<Vertex> consumers = new Consumers<>(this.executor,
consumer, done);
- consumers.start();
+ consumers.start("task-" + this.job.task().id());
+ long total = 0L;
try {
- long total = 0L;
while (vertices.hasNext()) {
this.updateProgress(++this.progress);
total++;
Vertex v = vertices.next();
consumers.provide(v);
}
- return total;
+ } catch (StopExecution e) {
+ // pass
} catch (Throwable e) {
throw Consumers.wrapException(e);
} finally {
consumers.await();
+ CloseableIterator.closeIterator(vertices);
}
+ return total;
}
protected Iterator<Vertex> vertices() {
@@ -520,9 +539,11 @@ public abstract class AbstractAlgorithm implements Algorithm {
}
public void put(K key, long value) {
+ assert this.topN != 0L;
this.tops.put(key, new MutableLong(value));
// keep 2x buffer
- if (this.tops.size() > this.topN * 2) {
+ if (this.tops.size() > this.topN * 2 &&
+ this.topN != HugeTraverser.NO_LIMIT) {
this.shrinkIfNeeded(this.topN);
}
}
@@ -537,7 +558,10 @@ public abstract class AbstractAlgorithm implements Algorithm {
}
private void shrinkIfNeeded(long limit) {
- if (this.tops.size() >= limit && limit != HugeTraverser.NO_LIMIT) {
+ assert limit != 0L;
+ if (this.tops.size() >= limit &&
+ (limit > 0L || limit == HugeTraverser.NO_LIMIT)) {
+ // Just do sort if limit=NO_LIMIT, else do sort and shrink
this.tops = HugeTraverser.topN(this.tops, true, limit);
}
}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
index f5d01d980..711e95edc 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
@@ -73,14 +73,14 @@ public class Consumers<V> {
this.queue = new ArrayBlockingQueue<>(this.queueSize);
}
- public void start() {
+ public void start(String name) {
this.ending = false;
this.exception = null;
if (this.executor == null) {
return;
}
- LOG.info("Starting {} workers with queue size {}...",
- this.workers, this.queueSize);
+ LOG.info("Starting {} workers[{}] with queue size {}...",
+ this.workers, name, this.queueSize);
for (int i = 0; i < this.workers; i++) {
this.executor.submit(() -> {
try {
@@ -88,8 +88,10 @@ public class Consumers<V> {
this.done();
} catch (Throwable e) {
// Only the first exception of one thread can be stored
- this.exception = e;
- LOG.error("Error when running task", e);
+ this.exception = e;
+ if (!(e instanceof StopExecution)) {
+ LOG.error("Error when running task", e);
+ }
this.done();
} finally {
this.latch.countDown();
@@ -183,4 +185,17 @@ public class Consumers<V> {
throw new HugeException("Error when running task: %s",
HugeException.rootCause(e).getMessage(), e);
}
+
+ public static class StopExecution extends HugeException {
+
+ private static final long serialVersionUID = -371829356182454517L;
+
+ public StopExecution(String message) {
+ super(message);
+ }
+
+ public StopExecution(String message, Object... args) {
+ super(message, args);
+ }
+ }
}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java
index 385fbbb5b..4ce97d362 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java
@@ -39,6 +39,7 @@ import com.baidu.hugegraph.job.algorithm.comm.ClusterCoeffcientAlgorithm;
import com.baidu.hugegraph.job.algorithm.path.RingsDetectAlgorithm;
import com.baidu.hugegraph.job.algorithm.rank.PageRankAlgorithm;
import com.baidu.hugegraph.task.HugeTask;
+import com.baidu.hugegraph.traversal.algorithm.HugeTraverser;
import com.baidu.hugegraph.traversal.optimize.HugeScriptTraversal;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.InsertionOrderUtil;
@@ -123,6 +124,7 @@ public class SubgraphStatAlgorithm extends AbstractAlgorithm {
"depth", 10L,
"degree", -1L,
"sample", -1L,
+ "top", -1L /* sorted */,
"workers", 0);
public Traverser(Job<Object> job) {
@@ -158,6 +160,8 @@ public class SubgraphStatAlgorithm extends AbstractAlgorithm {
parameters = ImmutableMap.<String, Object>builder()
.putAll(PARAMS)
.put("count_only", true)
+ .put("each_limit", NO_LIMIT)
+ .put("limit", NO_LIMIT)
.build();
results.put("rings", algo.call(job, parameters));
@@ -175,6 +179,7 @@ public class SubgraphStatAlgorithm extends AbstractAlgorithm {
Vertex vertex = vertices.next();
ranks.put(vertex.id(), vertex.value(R_RANK));
}
+ ranks = HugeTraverser.topN(ranks, true, NO_LIMIT);
return ranks;
}
}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java
index 22372ad09..7b11c134c 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java
@@ -24,9 +24,12 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.tinkerpop.gremlin.process.traversal.Order;
import org.apache.tinkerpop.gremlin.process.traversal.Pop;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.structure.Column;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Vertex;
@@ -130,10 +133,10 @@ public abstract class AbstractCentAlgorithm extends AbstractAlgorithm {
return unit;
}
- protected GraphTraversal<Vertex, Vertex> filterNonShortestPath(
- GraphTraversal<Vertex, Vertex>
- t) {
- long size = this.graph().traversal().V().limit(MAX_QUERY_LIMIT)
+ protected <V> GraphTraversal<V, V> filterNonShortestPath(
+ GraphTraversal<V, V> t,
+ boolean keepOneShortestPath) {
+ long size = this.graph().traversal().V().limit(100000L)
.count().next();
Map<Pair<Id, Id>, Integer> triples = new HashMap<>((int) size);
return t.filter(it -> {
@@ -142,15 +145,32 @@ public abstract class AbstractCentAlgorithm extends AbstractAlgorithm {
int len = it.<List<?>>path(Pop.all, "v").size();
Pair<Id, Id> key = Pair.of(start, end);
Integer shortest = triples.get(key);
- if (shortest != null && shortest != len) {
+ if (shortest != null && len > shortest) {
// ignore non shortest path
return false;
}
if (shortest == null) {
triples.put(key, len);
+ } else {
+ assert len == shortest;
+ if (keepOneShortestPath) {
+ return false;
+ }
}
return true;
});
}
+
+ protected GraphTraversal<Vertex, ?> topN(GraphTraversal<Vertex, ?> t,
+ long topN) {
+ if (topN > 0L || topN == NO_LIMIT) {
+ t = t.order(Scope.local).by(Column.values, Order.desc);
+ if (topN > 0L) {
+ assert topN != NO_LIMIT;
+ t = t.limit(Scope.local, topN);
+ }
+ }
+ return t;
+ }
}
}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java
index 370299163..40b38f655 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java
@@ -21,13 +21,10 @@ package com.baidu.hugegraph.job.algorithm.cent;
import java.util.Map;
-import org.apache.tinkerpop.gremlin.process.traversal.Order;
import org.apache.tinkerpop.gremlin.process.traversal.P;
import org.apache.tinkerpop.gremlin.process.traversal.Pop;
-import org.apache.tinkerpop.gremlin.process.traversal.Scope;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
-import org.apache.tinkerpop.gremlin.structure.Column;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import com.baidu.hugegraph.job.Job;
@@ -72,7 +69,7 @@ public class BetweenessCentralityAlgorithm extends AbstractCentAlgorithm {
long topN) {
assert depth > 0;
assert degree > 0L || degree == NO_LIMIT;
- assert topN >= 0L;
+ assert topN >= 0L || topN == NO_LIMIT;
GraphTraversal<Vertex, Vertex> t = constructSource(sourceLabel,
sourceSample,
@@ -80,14 +77,11 @@ public class BetweenessCentralityAlgorithm extends AbstractCentAlgorithm {
t = constructPath(t, direction, label, degree, sample,
sourceLabel, sourceCLabel);
t = t.emit().until(__.loops().is(P.gte(depth)));
- t = filterNonShortestPath(t);
+ t = filterNonShortestPath(t, false);
GraphTraversal<Vertex, ?> tg = t.select(Pop.all, "v")
- .unfold().id()
- .groupCount().order(Scope.local)
- .by(Column.values, Order.desc);
- GraphTraversal<Vertex, ?> tLimit = topN <= 0L ? tg :
- tg.limit(Scope.local, topN);
+ .unfold().id().groupCount();
+ GraphTraversal<Vertex, ?> tLimit = topN(tg, topN);
return this.execute(tLimit, () -> tLimit.next());
}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java
index 56d61504a..9a25b6394 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java
@@ -22,13 +22,11 @@ package com.baidu.hugegraph.job.algorithm.cent;
import java.util.Map;
import org.apache.tinkerpop.gremlin.process.traversal.Operator;
-import org.apache.tinkerpop.gremlin.process.traversal.Order;
import org.apache.tinkerpop.gremlin.process.traversal.P;
import org.apache.tinkerpop.gremlin.process.traversal.Pop;
import org.apache.tinkerpop.gremlin.process.traversal.Scope;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
-import org.apache.tinkerpop.gremlin.structure.Column;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import com.baidu.hugegraph.job.Job;
@@ -81,7 +79,7 @@ public class ClosenessCentralityAlgorithm extends AbstractCentAlgorithm {
long topN) {
assert depth > 0;
assert degree > 0L || degree == NO_LIMIT;
- assert topN >= 0L;
+ assert topN >= 0L || topN == NO_LIMIT;
GraphTraversal<Vertex, Vertex> t = constructSource(sourceLabel,
sourceSample,
@@ -89,15 +87,13 @@ public class ClosenessCentralityAlgorithm extends AbstractCentAlgorithm {
t = constructPath(t, direction, label, degree, sample,
sourceLabel, sourceCLabel);
t = t.emit().until(__.loops().is(P.gte(depth)));
- t = filterNonShortestPath(t);
+ t = filterNonShortestPath(t, true);
GraphTraversal<Vertex, ?> tg;
tg = t.group().by(__.select(Pop.first, "v").id())
.by(__.select(Pop.all, "v").count(Scope.local)
- .sack(Operator.div).sack().sum())
- .order(Scope.local).by(Column.values, Order.desc);
- GraphTraversal<Vertex, ?> tLimit = topN <= 0L ? tg :
- tg.limit(Scope.local, topN);
+ .sack(Operator.div).sack().sum());
+ GraphTraversal<Vertex, ?> tLimit = topN(tg, topN);
return this.execute(tLimit, () -> tLimit.next());
}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java
index f29a6301d..01b3e5c4b 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java
@@ -64,10 +64,10 @@ public class DegreeCentralityAlgorithm extends AbstractCentAlgorithm {
String label,
long topN) {
if (direction == null || direction == Directions.BOTH) {
- return degreeCentrality(label, topN);
+ return this.degreeCentralityForBothDir(label, topN);
}
assert direction == Directions.OUT || direction == Directions.IN;
- assert topN >= 0L;
+ assert topN >= 0L || topN == NO_LIMIT;
Iterator<Edge> edges = this.edges(direction);
@@ -76,12 +76,12 @@ public class DegreeCentralityAlgorithm extends AbstractCentAlgorithm {
Id vertex = null;
Id labelId = this.getEdgeLabelId(label);
long degree = 0L;
- long total = 0L;
+ long totalEdges = 0L;
degrees.startObject();
while (edges.hasNext()) {
HugeEdge edge = (HugeEdge) edges.next();
- this.updateProgress(++total);
+ this.updateProgress(++totalEdges);
Id schemaLabel = edge.schemaLabel().id();
if (labelId != null && !labelId.equals(schemaLabel)) {
@@ -97,7 +97,7 @@ public class DegreeCentralityAlgorithm extends AbstractCentAlgorithm {
if (vertex != null) {
// next vertex found
- if (topN <= 0L) {
+ if (topN <= 0L && topN != NO_LIMIT) {
degrees.append(vertex, degree);
} else {
tops.put(vertex, degree);
@@ -108,7 +108,7 @@ public class DegreeCentralityAlgorithm extends AbstractCentAlgorithm {
}
if (vertex != null) {
- if (topN <= 0L) {
+ if (topN <= 0L && topN != NO_LIMIT) {
degrees.append(vertex, degree);
} else {
tops.put(vertex, degree);
@@ -121,9 +121,9 @@ public class DegreeCentralityAlgorithm extends AbstractCentAlgorithm {
return degrees.asJson();
}
- protected Object degreeCentrality(String label, long topN) {
- assert topN >= 0L;
- long total = 0L;
+ protected Object degreeCentralityForBothDir(String label, long topN) {
+ assert topN >= 0L || topN == NO_LIMIT;
+ long totalVertices = 0L;
JsonMap degrees = new JsonMap();
TopMap<Id> tops = new TopMap<>(topN);
@@ -132,11 +132,11 @@ public class DegreeCentralityAlgorithm extends AbstractCentAlgorithm {
degrees.startObject();
while (vertices.hasNext()) {
Id source = (Id) vertices.next().id();
- this.updateProgress(++total);
+ this.updateProgress(++totalVertices);
long degree = this.degree(source, label);
if (degree > 0L) {
- if (topN <= 0L) {
+ if (topN <= 0L && topN != NO_LIMIT) {
degrees.append(source, degree);
} else {
tops.put(source, degree);
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java
index ec065fa07..0f695a1fb 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java
@@ -21,11 +21,8 @@ package com.baidu.hugegraph.job.algorithm.cent;
import java.util.Map;
-import org.apache.tinkerpop.gremlin.process.traversal.Order;
-import org.apache.tinkerpop.gremlin.process.traversal.Scope;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
-import org.apache.tinkerpop.gremlin.structure.Column;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
@@ -74,7 +71,7 @@ public class EigenvectorCentralityAlgorithm extends AbstractCentAlgorithm {
long topN) {
assert depth > 0;
assert degree > 0L || degree == NO_LIMIT;
- assert topN >= 0L;
+ assert topN >= 0L || topN == NO_LIMIT;
// TODO: support parameters: Directions dir, String label
/*
@@ -96,10 +93,8 @@ public class EigenvectorCentralityAlgorithm extends AbstractCentAlgorithm {
t = t.repeat(__.groupCount("m").by(T.id)
.local(unit).simplePath()).times(depth);
- GraphTraversal<Vertex, Object> tCap;
- tCap = t.cap("m").order(Scope.local).by(Column.values, Order.desc);
- GraphTraversal<Vertex, ?> tLimit = topN <= 0L ? tCap :
- tCap.limit(Scope.local, topN);
+ GraphTraversal<Vertex, Object> tCap = t.cap("m");
+ GraphTraversal<Vertex, ?> tLimit = topN(tCap, topN);
return this.execute(tLimit, () -> tLimit.next());
}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java
index 7ac30cd2d..3f3a26c3c 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java
@@ -23,33 +23,48 @@ import java.util.Map;
import com.baidu.hugegraph.job.Job;
import com.baidu.hugegraph.type.define.Directions;
+import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.InsertionOrderUtil;
public class ClusterCoeffcientAlgorithm extends AbstractCommAlgorithm {
+ public static final String ALGO_NAME = "cluster_coeffcient";
+
@Override
public String name() {
- return "cluster_coeffcient";
+ return ALGO_NAME;
}
@Override
public void checkParameters(Map<String, Object> parameters) {
- directionOutIn(parameters);
+ direction(parameters);
degree(parameters);
+ workersWhenBoth(parameters);
}
@Override
public Object call(Job<Object> job, Map<String, Object> parameters) {
- try (Traverser traverser = new Traverser(job)) {
- return traverser.clusterCoeffcient(directionOutIn(parameters),
+ int workers = workersWhenBoth(parameters);
+ try (Traverser traverser = new Traverser(job, workers)) {
+ return traverser.clusterCoeffcient(direction(parameters),
degree(parameters));
}
}
+ protected static int workersWhenBoth(Map<String, Object> parameters) {
+ Directions direction = direction(parameters);
+ int workers = workers(parameters);
+ E.checkArgument(direction == Directions.BOTH || workers <= 0,
+ "The workers must be not set when direction!=BOTH, " +
+ "but got workers=%s and direction=%s",
+ workers, direction);
+ return workers;
+ }
+
private static class Traverser extends TriangleCountAlgorithm.Traverser {
- public Traverser(Job<Object> job) {
- super(job);
+ public Traverser(Job<Object> job, int workers) {
+ super(job, ALGO_NAME, workers);
}
public Object clusterCoeffcient(Directions direction, long degree) {
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java
index 923c1a2a3..52ddeeb71 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java
@@ -44,6 +44,8 @@ import com.google.common.collect.ImmutableSet;
public class KCoreAlgorithm extends AbstractCommAlgorithm {
+ public static final String ALGO_NAME = "k_core";
+
public static final String KEY_K = "k";
public static final String KEY_MERGED = "merged";
@@ -51,7 +53,7 @@ public class KCoreAlgorithm extends AbstractCommAlgorithm {
@Override
public String name() {
- return "k_core";
+ return ALGO_NAME;
}
@Override
@@ -101,7 +103,7 @@ public class KCoreAlgorithm extends AbstractCommAlgorithm {
private static class Traverser extends AlgoTraverser {
public Traverser(Job<Object> job, int workers) {
- super(job, "kcore", workers);
+ super(job, ALGO_NAME, workers);
}
public Object kcore(String sourceLabel, String sourceCLabel,
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java
index 446ab2686..8eee9f43e 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java
@@ -26,9 +26,11 @@ import com.baidu.hugegraph.traversal.algorithm.HugeTraverser;
public class LouvainAlgorithm extends AbstractCommAlgorithm {
+ public static final String ALGO_NAME = "louvain";
+
@Override
public String name() {
- return "louvain";
+ return ALGO_NAME;
}
@Override
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java
index 5d7548aa3..6135d1d40 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java
@@ -85,7 +85,7 @@ public class LouvainTraverser extends AlgoTraverser {
public LouvainTraverser(Job<Object> job, int workers, long degree,
String sourceLabel, String sourceCLabel) {
- super(job, "louvain", workers);
+ super(job, LouvainAlgorithm.ALGO_NAME, workers);
this.g = this.graph().traversal();
this.sourceLabel = sourceLabel;
this.sourceCLabel = sourceCLabel;
@@ -422,7 +422,7 @@ public class LouvainTraverser extends AlgoTraverser {
}
});
- consumers.start();
+ consumers.start("louvain-move-pass-" + pass);
try {
while (vertices.hasNext()) {
this.updateProgress(++this.progress);
@@ -460,7 +460,7 @@ public class LouvainTraverser extends AlgoTraverser {
this.graph().tx().commit();
});
- consumers.start();
+ consumers.start("louvain-merge-pass-" + pass);
try {
for (Pair<Community, Set<Id>> pair : comms) {
Community c = pair.getLeft();
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java
index e15665cfc..0f3506a15 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java
@@ -41,9 +41,11 @@ import com.google.common.collect.ImmutableMap;
public class LpaAlgorithm extends AbstractCommAlgorithm {
+ public static final String ALGO_NAME = "lpa";
+
@Override
public String name() {
- return "lpa";
+ return ALGO_NAME;
}
@Override
@@ -87,7 +89,7 @@ public class LpaAlgorithm extends AbstractCommAlgorithm {
private final Random R = new Random();
public Traverser(Job<Object> job, int workers) {
- super(job, "lpa", workers);
+ super(job, ALGO_NAME, workers);
}
public Object lpa(String sourceLabel, String edgeLabel,
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java
index 0adb4707c..d8a17653c 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java
@@ -19,48 +19,70 @@
package com.baidu.hugegraph.job.algorithm.comm;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang.mutable.MutableLong;
import org.apache.tinkerpop.gremlin.structure.Edge;
import com.baidu.hugegraph.backend.id.Id;
+import com.baidu.hugegraph.backend.id.IdGenerator;
import com.baidu.hugegraph.job.Job;
import com.baidu.hugegraph.structure.HugeEdge;
import com.baidu.hugegraph.type.define.Directions;
+import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.InsertionOrderUtil;
import com.google.common.collect.ImmutableMap;
public class TriangleCountAlgorithm extends AbstractCommAlgorithm {
+ public static final String ALGO_NAME = "triangle_count";
+
@Override
public String name() {
- return "triangle_count";
+ return ALGO_NAME;
}
@Override
public void checkParameters(Map<String, Object> parameters) {
- directionOutIn(parameters);
+ direction4Out(parameters);
degree(parameters);
+ workersWhenBoth(parameters);
}
@Override
public Object call(Job<Object> job, Map<String, Object> parameters) {
- try (Traverser traverser = new Traverser(job)) {
- return traverser.triangleCount(directionOutIn(parameters),
+ int workers = workersWhenBoth(parameters);
+ try (Traverser traverser = new Traverser(job, workers)) {
+ return traverser.triangleCount(direction4Out(parameters),
degree(parameters));
}
}
+ protected static int workersWhenBoth(Map<String, Object> parameters) {
+ Directions direction = direction4Out(parameters);
+ int workers = workers(parameters);
+ E.checkArgument(direction == Directions.BOTH || workers <= 0,
+ "The workers must be not set when direction!=BOTH, " +
+ "but got workers=%s and direction=%s",
+ workers, direction);
+ return workers;
+ }
+
protected static class Traverser extends AlgoTraverser {
protected static final String KEY_TRIANGLES = "triangles";
protected static final String KEY_TRIADS = "triads";
- public Traverser(Job<Object> job) {
- super(job);
+ public Traverser(Job<Object> job, int workers) {
+ super(job, ALGO_NAME, workers);
+ }
+
+ protected Traverser(Job<Object> job, String name, int workers) {
+ super(job, name, workers);
}
public Object triangleCount(Directions direction, long degree) {
@@ -73,22 +95,23 @@ public class TriangleCountAlgorithm extends AbstractCommAlgorithm {
protected Map<String, Long> triangles(Directions direction,
long degree) {
if (direction == null || direction == Directions.BOTH) {
- throw new IllegalArgumentException("Direction must be OUT/IN");
+ return this.trianglesForBothDir(degree);
}
+
assert direction == Directions.OUT || direction == Directions.IN;
Iterator<Edge> edges = this.edges(direction);
long triangles = 0L;
long triads = 0L;
- long total = 0L;
+ long totalEdges = 0L;
long totalVertices = 0L;
Id vertex = null;
- Set<Id> adjVertices = new HashSet<>();
+ Set<Id> adjVertices = newOrderedSet();
while (edges.hasNext()) {
HugeEdge edge = (HugeEdge) edges.next();
- this.updateProgress(++total);
+ this.updateProgress(++totalEdges);
Id source = edge.ownerVertex().id();
Id target = edge.otherVertex().id();
@@ -108,37 +131,97 @@ public class TriangleCountAlgorithm extends AbstractCommAlgorithm {
* B -> [D,F]
* E -> [B,C,F]
*/
- triangles += this.intersect(direction, degree, adjVertices);
+ triangles += this.intersect(degree, adjVertices);
triads += this.localTriads(adjVertices.size());
totalVertices++;
// Reset for the next source
- adjVertices = new HashSet<>();
+ adjVertices = newOrderedSet();
}
vertex = source;
adjVertices.add(target);
}
if (vertex != null) {
- triangles += this.intersect(direction, degree, adjVertices);
+ triangles += this.intersect(degree, adjVertices);
triads += this.localTriads(adjVertices.size());
totalVertices++;
}
String suffix = "_" + direction.string();
- return ImmutableMap.of("edges" + suffix, total,
+ return ImmutableMap.of("edges" + suffix, totalEdges,
"vertices" + suffix, totalVertices,
KEY_TRIANGLES, triangles,
KEY_TRIADS, triads);
}
- protected long intersect(Directions dir, long degree,
- Set<Id> adjVertices) {
+ protected Map<String, Long> trianglesForBothDir(long degree) {
+ AtomicLong triangles = new AtomicLong(0L);
+ AtomicLong triads = new AtomicLong(0L);
+ AtomicLong totalEdges = new AtomicLong(0L);
+ AtomicLong totalVertices = new AtomicLong(0L);
+
+ this.traverse(null, null, v -> {
+ Id source = (Id) v.id();
+
+ MutableLong edgesCount = new MutableLong(0L);
+ Set<Id> adjVertices = this.adjacentVertices(source, degree,
+ edgesCount);
+
+ triangles.addAndGet(this.intersect(degree, adjVertices));
+ triads.addAndGet(this.localTriads(adjVertices.size()));
+
+ totalVertices.incrementAndGet();
+ totalEdges.addAndGet(edgesCount.longValue());
+ });
+
+ assert totalEdges.get() % 2L == 0L;
+ assert triangles.get() % 3L == 0L;
+ // totalEdges /= 2L
+ totalEdges.getAndAccumulate(2L, (l, w) -> l / w);
+ // triangles /= 3L
+ triangles.getAndAccumulate(3L, (l, w) -> l / w);
+ // triads -= triangles * 2L
+ triads.addAndGet(triangles.get() * -2L);
+
+ return ImmutableMap.of("edges", totalEdges.get(),
+ "vertices", totalVertices.get(),
+ KEY_TRIANGLES, triangles.get(),
+ KEY_TRIADS, triads.get());
+ }
+
+ private Set<Id> adjacentVertices(Id source, long degree,
+ MutableLong edgesCount) {
+ Iterator<Id> adjVertices = this.adjacentVertices(source,
+ Directions.BOTH,
+ null, degree);
+ Set<Id> set = newOrderedSet();
+ while (adjVertices.hasNext()) {
+ edgesCount.increment();
+ set.add(adjVertices.next());
+ }
+ return set;
+ }
+
+ protected long intersect(long degree, Set<Id> adjVertices) {
long count = 0L;
+ Directions dir = Directions.OUT;
+ Id empty = IdGenerator.of(0);
Iterator<Id> vertices;
for (Id v : adjVertices) {
vertices = this.adjacentVertices(v, dir, null, degree);
+ Id lastVertex = empty;
while (vertices.hasNext()) {
Id vertex = vertices.next();
+ if (lastVertex.equals(vertex)) {
+ // Skip duplicated target vertex (through sortkeys)
+ continue;
+ }
+ lastVertex = vertex;
+
+ /*
+ * FIXME: deduplicate two edges with opposite directions
+ * between two specified adjacent vertices
+ */
if (adjVertices.contains(vertex)) {
count++;
}
@@ -150,5 +233,9 @@ public class TriangleCountAlgorithm extends AbstractCommAlgorithm {
protected long localTriads(int size) {
return size * (size - 1L) / 2L;
}
+
+ protected static <V> Set<V> newOrderedSet() {
+ return new TreeSet<>();
+ }
}
}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java
index 3d5bd163e..bbb028efc 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java
@@ -20,34 +20,37 @@
package com.baidu.hugegraph.job.algorithm.path;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.job.Job;
import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm;
+import com.baidu.hugegraph.job.algorithm.Consumers.StopExecution;
import com.baidu.hugegraph.traversal.algorithm.SubGraphTraverser;
import com.baidu.hugegraph.type.define.Directions;
import com.baidu.hugegraph.util.JsonUtil;
public class RingsDetectAlgorithm extends AbstractAlgorithm {
+ public static final String ALGO_NAME = "rings";
+
public static final String KEY_COUNT_ONLY = "count_only";
@Override
- public String name() {
- return "rings_detect";
+ public String category() {
+ return CATEGORY_PATH;
}
@Override
- public String category() {
- return CATEGORY_PATH;
+ public String name() {
+ return ALGO_NAME;
}
@Override
public void checkParameters(Map<String, Object> parameters) {
depth(parameters);
degree(parameters);
- capacity(parameters);
+ eachLimit(parameters);
limit(parameters);
sourceLabel(parameters);
sourceCLabel(parameters);
@@ -67,13 +70,13 @@ public class RingsDetectAlgorithm extends AbstractAlgorithm {
edgeLabel(parameters),
depth(parameters),
degree(parameters),
- capacity(parameters),
+ eachLimit(parameters),
limit(parameters),
countOnly(parameters));
}
}
- public boolean countOnly(Map<String, Object> parameters) {
+ protected boolean countOnly(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_COUNT_ONLY)) {
return false;
}
@@ -83,12 +86,12 @@ public class RingsDetectAlgorithm extends AbstractAlgorithm {
private static class Traverser extends AlgoTraverser {
public Traverser(Job<Object> job, int workers) {
- super(job, "ring", workers);
+ super(job, ALGO_NAME, workers);
}
public Object rings(String sourceLabel, String sourceCLabel,
Directions dir, String label, int depth,
- long degree, long capacity, long limit,
+ long degree, long eachLimit, long limit,
boolean countOnly) {
JsonMap ringsJson = new JsonMap();
ringsJson.startObject();
@@ -100,24 +103,24 @@ public class RingsDetectAlgorithm extends AbstractAlgorithm {
}
SubGraphTraverser traverser = new SubGraphTraverser(this.graph());
- AtomicInteger count = new AtomicInteger(0);
+ AtomicLong count = new AtomicLong(0L);
this.traverse(sourceLabel, sourceCLabel, v -> {
Id source = (Id) v.id();
PathSet rings = traverser.rings(source, dir, label, depth,
- true, degree, capacity, limit);
+ true, degree, MAX_CAPACITY,
+ eachLimit);
+ assert eachLimit == NO_LIMIT || rings.size() <= eachLimit;
for (Path ring : rings) {
- Id min = null;
- for (Id id : ring.vertices()) {
- if (min == null || id.compareTo(min) < 0) {
- min = id;
- }
+ if (eachLimit == NO_LIMIT && !ring.ownedBy(source)) {
+ // Only dedup rings when each_limit!=NO_LIMIT
+ continue;
}
- if (source.equals(min)) {
- if (countOnly) {
- count.incrementAndGet();
- continue;
- }
+
+ if (count.incrementAndGet() > limit && limit != NO_LIMIT) {
+ throw new StopExecution("exceed limit %s", limit);
+ }
+ if (!countOnly) {
String ringJson = JsonUtil.toJson(ring.vertices());
synchronized (ringsJson) {
ringsJson.appendRaw(ringJson);
@@ -125,8 +128,14 @@ public class RingsDetectAlgorithm extends AbstractAlgorithm {
}
}
});
+
if (countOnly) {
- ringsJson.append(count.get());
+ long counted = count.get();
+ if (limit != NO_LIMIT && counted > limit) {
+ // The count increased by multi threads and exceed limit
+ counted = limit;
+ }
+ ringsJson.append(counted);
} else {
ringsJson.endList();
}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java
index 82294f48b..fbaca4960 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java
@@ -20,12 +20,14 @@
package com.baidu.hugegraph.job.algorithm.similarity;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.job.Job;
import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm;
+import com.baidu.hugegraph.job.algorithm.Consumers.StopExecution;
import com.baidu.hugegraph.schema.EdgeLabel;
import com.baidu.hugegraph.traversal.algorithm.FusiformSimilarityTraverser;
import com.baidu.hugegraph.traversal.algorithm.FusiformSimilarityTraverser.SimilarsMap;
@@ -35,24 +37,27 @@ import com.baidu.hugegraph.util.JsonUtil;
public class FusiformSimilarityAlgorithm extends AbstractAlgorithm {
+ public static final String ALGO_NAME = "fusiform_similarity";
+
public static final String KEY_MIN_NEIGHBORS = "min_neighbors";
public static final String KEY_MIN_SIMILARS = "min_similars";
+ public static final String KEY_TOP_SIMILARS = "top_similars";
public static final String KEY_GROUP_PROPERTY = "group_property";
public static final String KEY_MIN_GROUPS = "min_groups";
public static final int DEFAULT_MIN_NEIGHBORS = 10;
public static final int DEFAULT_MIN_SIMILARS = 6;
+ public static final int DEFAULT_TOP_SIMILARS = 0;
public static final int DEFAULT_MIN_GROUPS = 1;
- public static final long DEFAULT_LIMIT = -1L;
@Override
- public String name() {
- return "fusiform_similarity";
+ public String category() {
+ return CATEGORY_SIMI;
}
@Override
- public String category() {
- return CATEGORY_SIMI;
+ public String name() {
+ return ALGO_NAME;
}
@Override
@@ -60,11 +65,10 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm {
minNeighbors(parameters);
alpha(parameters);
minSimilars(parameters);
- top(parameters);
+ topSimilars(parameters);
groupProperty(parameters);
minGroups(parameters);
degree(parameters);
- capacity(parameters);
limit(parameters);
sourceLabel(parameters);
sourceCLabel(parameters);
@@ -84,11 +88,10 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm {
minNeighbors(parameters),
alpha(parameters),
minSimilars(parameters),
- top(parameters),
+ topSimilars(parameters),
groupProperty(parameters),
minGroups(parameters),
degree(parameters),
- capacity(parameters),
limit(parameters));
}
}
@@ -98,7 +101,7 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm {
return DEFAULT_MIN_NEIGHBORS;
}
int minNeighbors = parameterInt(parameters, KEY_MIN_NEIGHBORS);
- HugeTraverser.checkPositive(minNeighbors, "min neighbors");
+ HugeTraverser.checkPositive(minNeighbors, KEY_MIN_NEIGHBORS);
return minNeighbors;
}
@@ -107,7 +110,16 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm {
return DEFAULT_MIN_SIMILARS;
}
int minSimilars = parameterInt(parameters, KEY_MIN_SIMILARS);
- HugeTraverser.checkPositive(minSimilars, "min similars");
+ HugeTraverser.checkPositive(minSimilars, KEY_MIN_SIMILARS);
+ return minSimilars;
+ }
+
+ protected static int topSimilars(Map<String, Object> parameters) {
+ if (!parameters.containsKey(KEY_TOP_SIMILARS)) {
+ return DEFAULT_TOP_SIMILARS;
+ }
+ int minSimilars = parameterInt(parameters, KEY_TOP_SIMILARS);
+ HugeTraverser.checkNonNegative(minSimilars, KEY_TOP_SIMILARS);
return minSimilars;
}
@@ -123,7 +135,7 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm {
return DEFAULT_MIN_GROUPS;
}
int minGroups = parameterInt(parameters, KEY_MIN_GROUPS);
- HugeTraverser.checkPositive(minGroups, "min groups");
+ HugeTraverser.checkPositive(minGroups, KEY_MIN_GROUPS);
return minGroups;
}
@@ -139,7 +151,7 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm {
private static class Traverser extends AlgoTraverser {
public Traverser(Job<Object> job, int workers) {
- super(job, "fusiform", workers);
+ super(job, ALGO_NAME, workers);
}
public Object fusiformSimilars(String sourceLabel, String sourceCLabel,
@@ -147,12 +159,14 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm {
int minNeighbors, double alpha,
int minSimilars, long topSimilars,
String groupProperty, int minGroups,
- long degree, long capacity, long limit) {
+ long degree, long limit) {
HugeGraph graph = this.graph();
EdgeLabel edgeLabel = label == null ? null : graph.edgeLabel(label);
FusiformSimilarityTraverser traverser =
new FusiformSimilarityTraverser(graph);
+
+ AtomicLong count = new AtomicLong(0L);
JsonMap similarsJson = new JsonMap();
similarsJson.startObject();
@@ -162,16 +176,20 @@ public class FusiformSimilarityAlgorithm extends AbstractAlgorithm {
edgeLabel, minNeighbors, alpha,
minSimilars, (int) topSimilars,
groupProperty, minGroups, degree,
- capacity, NO_LIMIT, true);
+ MAX_CAPACITY, NO_LIMIT, true);
if (similars.isEmpty()) {
return;
}
String result = JsonUtil.toJson(similars.toMap());
result = result.substring(1, result.length() - 1);
synchronized (similarsJson) {
+ if (count.incrementAndGet() > limit && limit != NO_LIMIT) {
+ throw new StopExecution("exceed limit %s", limit);
+ }
similarsJson.appendRaw(result);
}
- }, null, limit);
+ });
+
similarsJson.endObject();
return similarsJson.asJson();