You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by ji...@apache.org on 2022/11/09 10:25:04 UTC
[incubator-hugegraph] 01/33: implement 8 olap algorithms (#4)
This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit 9e4e33f41ae7d879ede68461a41584adfdb76b0d
Author: Jermy Li <li...@baidu.com>
AuthorDate: Thu Apr 9 11:22:56 2020 +0800
implement 8 olap algorithms (#4)
* add olap algo api
* improve source filter
* fix louvain shaking with limit degree
* catch exception for lpa and louvain
* add 3 params for lpa: label,source_label,percision
* improve louvain node store
* remove vertices from class Community
* move showCommunity to AbstractCommAlgorithm
* add some parameters to AbstractAlgorithm
* improve louvain log
* improve clearPass and communities check
* split louvain cache
* fix degreeCentrality bug: degree is always < 500
Change-Id: I2341b981dab44f43ac50ae0f8fa5e51b7acc1b5a
---
.../com/baidu/hugegraph/api/job/AlgorithmAPI.java | 84 +++
.../java/com/baidu/hugegraph/job/AlgorithmJob.java | 71 ++
.../hugegraph/job/algorithm/AbstractAlgorithm.java | 516 +++++++++++++++
.../baidu/hugegraph/job/algorithm/Algorithm.java | 35 +
.../hugegraph/job/algorithm/AlgorithmPool.java | 71 ++
.../job/algorithm/CountEdgeAlgorithm.java | 79 +++
.../job/algorithm/CountVertexAlgorithm.java | 79 +++
.../job/algorithm/cent/AbstractCentAlgorithm.java | 113 ++++
.../cent/BetweenessCentralityAlgorithm.java | 101 +++
.../cent/ClosenessCentralityAlgorithm.java | 111 ++++
.../algorithm/cent/DegreeCentralityAlgorithm.java | 140 ++++
.../cent/EigenvectorCentralityAlgorithm.java | 100 +++
.../job/algorithm/comm/AbstractCommAlgorithm.java | 78 +++
.../algorithm/comm/ClusterCoeffcientAlgorithm.java | 70 ++
.../job/algorithm/comm/LouvainAlgorithm.java | 83 +++
.../job/algorithm/comm/LouvainTraverser.java | 715 +++++++++++++++++++++
.../hugegraph/job/algorithm/comm/LpaAlgorithm.java | 263 ++++++++
.../job/algorithm/comm/TriangleCountAlgorithm.java | 153 +++++
18 files changed, 2862 insertions(+)
diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/AlgorithmAPI.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/AlgorithmAPI.java
new file mode 100644
index 000000000..c965e02a5
--- /dev/null
+++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/AlgorithmAPI.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.api.job;
+
+import java.util.Map;
+
+import javax.inject.Singleton;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.HugeGraph;
+import com.baidu.hugegraph.api.API;
+import com.baidu.hugegraph.api.filter.StatusFilter.Status;
+import com.baidu.hugegraph.backend.id.Id;
+import com.baidu.hugegraph.core.GraphManager;
+import com.baidu.hugegraph.job.AlgorithmJob;
+import com.baidu.hugegraph.job.JobBuilder;
+import com.baidu.hugegraph.server.RestServer;
+import com.baidu.hugegraph.util.E;
+import com.baidu.hugegraph.util.JsonUtil;
+import com.baidu.hugegraph.util.Log;
+import com.codahale.metrics.annotation.Timed;
+import com.google.common.collect.ImmutableMap;
+
+@Path("graphs/{graph}/jobs/algorithm")
+@Singleton
+public class AlgorithmAPI extends API {
+
+ private static final Logger LOG = Log.logger(RestServer.class);
+
+ @POST
+ @Timed
+ @Path("/{name}")
+ @Status(Status.CREATED)
+ @Consumes(APPLICATION_JSON)
+ @Produces(APPLICATION_JSON_WITH_CHARSET)
+ public Map<String, Id> post(@Context GraphManager manager,
+ @PathParam("graph") String graph,
+ @PathParam("name") String algorithm,
+ Map<String, Object> parameters) {
+ LOG.debug("Graph [{}] schedule algorithm job: {}", graph, parameters);
+ E.checkArgument(algorithm != null && !algorithm.isEmpty(),
+ "The algorithm name can't be empty");
+ if (parameters == null) {
+ parameters = ImmutableMap.of();
+ }
+ if (!AlgorithmJob.check(algorithm, parameters)) {
+ throw new NotFoundException("Not found algorithm: " + algorithm);
+ }
+
+ HugeGraph g = graph(manager, graph);
+ Map<String, Object> input = ImmutableMap.of("algorithm", algorithm,
+ "parameters", parameters);
+ JobBuilder<Object> builder = JobBuilder.of(g);
+ builder.name("algorithm:" + algorithm)
+ .input(JsonUtil.toJson(input))
+ .job(new AlgorithmJob());
+ return ImmutableMap.of("task_id", builder.schedule().id());
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/AlgorithmJob.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/AlgorithmJob.java
new file mode 100644
index 000000000..7e752ac42
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/AlgorithmJob.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job;
+
+import java.util.Map;
+
+import com.baidu.hugegraph.job.algorithm.Algorithm;
+import com.baidu.hugegraph.job.algorithm.AlgorithmPool;
+import com.baidu.hugegraph.util.E;
+import com.baidu.hugegraph.util.JsonUtil;
+
+public class AlgorithmJob extends Job<Object> {
+
+ public static final String TASK_TYPE = "algorithm";
+
+ public static boolean check(String name, Map<String, Object> parameters) {
+ Algorithm algorithm = AlgorithmPool.instance().find(name);
+ if (algorithm == null) {
+ return false;
+ }
+ algorithm.checkParameters(parameters);
+ return true;
+ }
+
+ @Override
+ public String type() {
+ return TASK_TYPE;
+ }
+
+ @Override
+ public Object execute() throws Exception {
+ String input = this.task().input();
+ E.checkArgumentNotNull(input, "The input can't be null");
+ @SuppressWarnings("unchecked")
+ Map<String, Object> map = JsonUtil.fromJson(input, Map.class);
+
+ Object value = map.get("algorithm");
+ E.checkArgument(value instanceof String,
+ "Invalid algorithm name '%s'", value);
+ String name = (String) value;
+
+ value = map.get("parameters");
+ E.checkArgument(value instanceof Map,
+ "Invalid algorithm parameters '%s'", value);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> parameters = (Map<String, Object>) value;
+
+ AlgorithmPool pool = AlgorithmPool.instance();
+ Algorithm algorithm = pool.find(name);
+ E.checkArgument(algorithm != null,
+ "There is no algorithm named '%s'", name);
+ return algorithm.call(this, parameters);
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
new file mode 100644
index 000000000..660ef9f8f
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
@@ -0,0 +1,516 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm;
+
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Property;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import com.baidu.hugegraph.HugeException;
+import com.baidu.hugegraph.backend.id.Id;
+import com.baidu.hugegraph.backend.query.ConditionQuery;
+import com.baidu.hugegraph.backend.query.Query;
+import com.baidu.hugegraph.iterator.FilterIterator;
+import com.baidu.hugegraph.iterator.FlatMapperIterator;
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.testutil.Whitebox;
+import com.baidu.hugegraph.traversal.algorithm.HugeTraverser;
+import com.baidu.hugegraph.type.HugeType;
+import com.baidu.hugegraph.type.define.Directions;
+import com.baidu.hugegraph.type.define.HugeKeys;
+import com.baidu.hugegraph.util.Bytes;
+import com.baidu.hugegraph.util.E;
+import com.baidu.hugegraph.util.JsonUtil;
+
+import jersey.repackaged.com.google.common.base.Objects;
+
+@SuppressWarnings("deprecation") // StringEscapeUtils
+public abstract class AbstractAlgorithm implements Algorithm {
+
+ public static final long MAX_RESULT_SIZE = 100L * Bytes.MB;
+ public static final long MAX_QUERY_LIMIT = 10000000L; // about 10GB
+ public static final int BATCH = 500;
+
+ public static final String CATEGORY_AGGR = "aggregate";
+ public static final String CATEGORY_PATH = "path";
+ public static final String CATEGORY_RANK = "rank";
+ public static final String CATEGORY_SIMI = "similarity";
+ public static final String CATEGORY_COMM = "community";
+ public static final String CATEGORY_CENT = "centrality";
+
+ public static final String KEY_DIRECTION = "direction";
+ public static final String KEY_LABEL = "label";
+ public static final String KEY_DEPTH = "depth";
+ public static final String KEY_DEGREE = "degree";
+ public static final String KEY_SAMPLE = "sample";
+ public static final String KEY_SOURCE_SAMPLE = "source_sample";
+ public static final String KEY_SOURCE_LABEL = "source_label";
+ public static final String KEY_SOURCE_CLABEL = "source_clabel";
+ public static final String KEY_TOP = "top";
+ public static final String KEY_TIMES = "times";
+ public static final String KEY_STABLE_TIMES = "stable_times";
+ public static final String KEY_PRECISION = "precision";
+ public static final String KEY_SHOW_COMM = "show_community";
+ public static final String KEY_CLEAR = "clear";
+ public static final String KEY_CAPACITY = "capacity";
+ public static final String KEY_LIMIT = "limit";
+
+ public static final long DEFAULT_CAPACITY = 10000000L;
+ public static final long DEFAULT_LIMIT = 100L;
+ public static final long DEFAULT_DEGREE = 100L;
+ public static final long DEFAULT_SAMPLE = 1L;
+ public static final long DEFAULT_TIMES = 20L;
+ public static final long DEFAULT_STABLE_TIMES= 3L;
+ public static final double DEFAULT_PRECISION = 1.0 / 1000;
+
+ @Override
+ public void checkParameters(Map<String, Object> parameters) {
+ E.checkArgument(parameters.isEmpty(),
+ "Unnecessary parameters: %s", parameters);
+ }
+
+ protected static int depth(Map<String, Object> parameters) {
+ int depth = parameterInt(parameters, KEY_DEPTH);
+ E.checkArgument(depth > 0,
+ "The value of %s must be > 0, but got %s",
+ KEY_DEPTH, depth);
+ return depth;
+ }
+
+ protected static String edgeLabel(Map<String, Object> parameters) {
+ if (!parameters.containsKey(KEY_LABEL)) {
+ return null;
+ }
+ return parameterString(parameters, KEY_LABEL);
+ }
+
+ protected static Directions direction(Map<String, Object> parameters) {
+ Object direction = parameter(parameters, KEY_DIRECTION);
+ return parseDirection(direction);
+ }
+
+ protected static long top(Map<String, Object> parameters) {
+ if (!parameters.containsKey(KEY_TOP)) {
+ return 0L;
+ }
+ long top = parameterLong(parameters, KEY_TOP);
+ E.checkArgument(top >= 0L,
+ "The value of %s must be >= 0, but got %s",
+ KEY_TOP, top);
+ return top;
+ }
+
+ protected static long degree(Map<String, Object> parameters) {
+ if (!parameters.containsKey(KEY_DEGREE)) {
+ return DEFAULT_DEGREE;
+ }
+ long degree = parameterLong(parameters, KEY_DEGREE);
+ HugeTraverser.checkDegree(degree);
+ return degree;
+ }
+
+ protected static long capacity(Map<String, Object> parameters) {
+ if (!parameters.containsKey(KEY_CAPACITY)) {
+ return DEFAULT_CAPACITY;
+ }
+ long capacity = parameterLong(parameters, KEY_CAPACITY);
+ HugeTraverser.checkCapacity(capacity);
+ return capacity;
+ }
+
+ protected static long limit(Map<String, Object> parameters) {
+ if (!parameters.containsKey(KEY_LIMIT)) {
+ return DEFAULT_LIMIT;
+ }
+ long limit = parameterLong(parameters, KEY_LIMIT);
+ HugeTraverser.checkLimit(limit);
+ return limit;
+ }
+
+ protected static long sample(Map<String, Object> parameters) {
+ if (!parameters.containsKey(KEY_SAMPLE)) {
+ return DEFAULT_SAMPLE;
+ }
+ long sample = parameterLong(parameters, KEY_SAMPLE);
+ HugeTraverser.checkPositiveOrNoLimit(sample, KEY_SAMPLE);
+ return sample;
+ }
+
+ protected static long sourceSample(Map<String, Object> parameters) {
+ if (!parameters.containsKey(KEY_SOURCE_SAMPLE)) {
+ return HugeTraverser.NO_LIMIT;
+ }
+ long sample = parameterLong(parameters, KEY_SOURCE_SAMPLE);
+ HugeTraverser.checkPositiveOrNoLimit(sample, KEY_SOURCE_SAMPLE);
+ return sample;
+ }
+
+ protected static String sourceLabel(Map<String, Object> parameters) {
+ if (!parameters.containsKey(KEY_SOURCE_LABEL)) {
+ return null;
+ }
+ return parameterString(parameters, KEY_SOURCE_LABEL);
+ }
+
+ protected static String sourceCLabel(Map<String, Object> parameters) {
+ if (!parameters.containsKey(KEY_SOURCE_CLABEL)) {
+ return null;
+ }
+ return parameterString(parameters, KEY_SOURCE_CLABEL);
+ }
+
+ public static Object parameter(Map<String, Object> parameters, String key) {
+ Object value = parameters.get(key);
+ E.checkArgument(value != null,
+ "Expect '%s' in parameters: %s",
+ key, parameters);
+ return value;
+ }
+
+ public static String parameterString(Map<String, Object> parameters,
+ String key) {
+ Object value = parameter(parameters, key);
+ E.checkArgument(value instanceof String,
+ "Expect string value for parameter '%s': '%s'",
+ key, value);
+ return (String) value;
+ }
+
+ public static int parameterInt(Map<String, Object> parameters,
+ String key) {
+ Object value = parameter(parameters, key);
+ E.checkArgument(value instanceof Number,
+ "Expect int value for parameter '%s': '%s'",
+ key, value);
+ return ((Number) value).intValue();
+ }
+
+ public static long parameterLong(Map<String, Object> parameters,
+ String key) {
+ Object value = parameter(parameters, key);
+ E.checkArgument(value instanceof Number,
+ "Expect long value for parameter '%s': '%s'",
+ key, value);
+ return ((Number) value).longValue();
+ }
+
+ public static double parameterDouble(Map<String, Object> parameters,
+ String key) {
+ Object value = parameter(parameters, key);
+ E.checkArgument(value instanceof Number,
+ "Expect double value for parameter '%s': '%s'",
+ key, value);
+ return ((Number) value).doubleValue();
+ }
+
+ public static boolean parameterBoolean(Map<String, Object> parameters,
+ String key) {
+ Object value = parameter(parameters, key);
+ E.checkArgument(value instanceof Boolean,
+ "Expect boolean value for parameter '%s': '%s'",
+ key, value);
+ return ((Boolean) value);
+ }
+
+ public static Directions parseDirection(Object direction) {
+ if (direction.equals(Directions.BOTH.toString())) {
+ return Directions.BOTH;
+ } else if (direction.equals(Directions.OUT.toString())) {
+ return Directions.OUT;
+ } else if (direction.equals(Directions.IN.toString())) {
+ return Directions.IN;
+ } else {
+ throw new IllegalArgumentException(String.format(
+ "The value of direction must be in [OUT, IN, BOTH], " +
+ "but got '%s'", direction));
+ }
+ }
+
+ public static class AlgoTraverser extends HugeTraverser {
+
+ private final Job<Object> job;
+ protected long progress;
+
+ public AlgoTraverser(Job<Object> job) {
+ super(job.graph());
+ this.job = job;
+ }
+
+ public void updateProgress(long progress) {
+ this.job.updateProgress((int) progress);
+ }
+
+ protected Iterator<Vertex> vertices() {
+ return this.vertices(Query.NO_LIMIT);
+ }
+
+ protected Iterator<Vertex> vertices(long limit) {
+ Query query = new Query(HugeType.VERTEX);
+ query.capacity(Query.NO_CAPACITY);
+ query.limit(limit);
+ return this.graph().vertices(query);
+ }
+
+ protected Iterator<Vertex> vertices(Object label, String key,
+ Object value, long limit) {
+ Iterator<Vertex> vertices = this.vertices(label, limit);
+ if (key != null) {
+ vertices = filter(vertices, key, value);
+ }
+ return vertices;
+ }
+
+ protected Iterator<Vertex> vertices(Object label, long limit) {
+ if (label == null) {
+ return this.vertices(limit);
+ }
+ ConditionQuery query = new ConditionQuery(HugeType.VERTEX);
+ query.capacity(Query.NO_CAPACITY);
+ query.limit(limit);
+ if (label != null) {
+ query.eq(HugeKeys.LABEL, this.getVertexLabelId(label));
+ }
+ return this.graph().vertices(query);
+ }
+
+ protected Iterator<Vertex> vertices(Iterator<Object> ids) {
+ return new FlatMapperIterator<>(ids, id -> {
+ return this.graph().vertices(id);
+ });
+ }
+
+ protected Iterator<Vertex> filter(Iterator<Vertex> vertices,
+ String key, Object value) {
+ return new FilterIterator<>(vertices, vertex -> {
+ boolean matched = match(vertex, key, value);
+ if (!matched) {
+ this.updateProgress(++this.progress);
+ }
+ return matched;
+ });
+ }
+
+ protected static boolean match(Element elem, String key, Object value) {
+ Property<Object> p = elem.property(key);
+ return p.isPresent() && Objects.equal(p.value(), value);
+ }
+
+ protected Iterator<Edge> edges(Directions dir) {
+ HugeType type = dir == null ? HugeType.EDGE : dir.type();
+ Query query = new Query(type);
+ query.capacity(Query.NO_CAPACITY);
+ query.limit(Query.NO_LIMIT);
+ return this.graph().edges(query);
+ }
+
+ protected void drop(GraphTraversal<?, ? extends Element> traversal) {
+ this.execute(traversal, () -> {
+ while (traversal.hasNext()) {
+ this.updateProgress(++this.progress);
+ traversal.next().remove();
+ this.commitIfNeeded();
+ }
+ return null;
+ });
+ this.graph().tx().commit();
+ }
+
+ protected <V> V execute(GraphTraversal<?, ?> traversal,
+ Callable<V> callback) {
+ long capacity = Query.defaultCapacity(MAX_QUERY_LIMIT);
+ try {
+ return callback.call();
+ } catch (Exception e) {
+ throw new HugeException("Failed to execute algorithm", e);
+ } finally {
+ Query.defaultCapacity(capacity);
+ try {
+ traversal.close();
+ } catch (Exception e) {
+ throw new HugeException("Can't close traversal", e);
+ }
+ }
+ }
+
+ protected void commitIfNeeded() {
+ // commit if needed
+ Transaction tx = this.graph().tx();
+ Whitebox.invoke(tx.getClass(), "commitIfGtSize", tx, BATCH);
+ }
+ }
+
+ public static final class TopMap {
+
+ private final long topN;
+ private Map<Id, MutableLong> tops;
+
+ public TopMap(long topN) {
+ this.topN = topN;
+ this.tops = new HashMap<>();
+ }
+
+ public int size() {
+ return this.tops.size();
+ }
+
+ public void put(Id key, long value) {
+ this.put(key, Long.valueOf(value));
+ }
+
+ public void put(Id key, Long value) {
+ this.tops.put(key, new MutableLong(value));
+ // keep 2x buffer
+ if (this.tops.size() > this.topN * 2) {
+ this.shrinkIfNeeded(this.topN);
+ }
+ }
+
+ public Set<Map.Entry<Id, MutableLong>> entrySet() {
+ this.shrinkIfNeeded(this.topN);
+ return this.tops.entrySet();
+ }
+
+ private void shrinkIfNeeded(long limit) {
+ if (this.tops.size() >= limit && limit != HugeTraverser.NO_LIMIT) {
+ this.tops = HugeTraverser.topN(this.tops, true, limit);
+ }
+ }
+ }
+
+ public static final class JsonMap {
+
+ private final StringBuilder json;
+
+ public JsonMap() {
+ this(4 * (int) Bytes.KB);
+ }
+
+ public JsonMap(int initCapaticy) {
+ this.json = new StringBuilder(initCapaticy);
+ }
+
+ public void startObject() {
+ this.json.append('{');
+ }
+
+ public void endObject() {
+ this.deleteLastComma();
+ this.json.append('}');
+ }
+
+ public void startList() {
+ this.json.append('[');
+ }
+
+ public void endList() {
+ this.deleteLastComma();
+ this.json.append(']');
+ }
+
+ public void deleteLastComma() {
+ int last = this.json.length() - 1;
+ if (last >= 0 && this.json.charAt(last) == ',') {
+ this.json.deleteCharAt(last);
+ }
+ }
+
+ public void appendKey(String key) {
+ this.appendString(key).append(':');
+ }
+
+ public void append(long value) {
+ this.json.append(value).append(',');
+ this.checkSizeLimit();
+ }
+
+ public void append(String value) {
+ this.appendString(value).append(',');
+ this.checkSizeLimit();
+ }
+
+ public void append(Object key, long value) {
+ this.append(key.toString(), value);
+ }
+
+ public void append(String key, long value) {
+ this.appendString(key).append(':');
+ this.json.append(value).append(',');
+ this.checkSizeLimit();
+ }
+
+ public void append(Object key, Number value) {
+ this.append(key.toString(), value);
+ }
+
+ public void append(String key, Number value) {
+ this.appendString(key).append(':');
+ this.json.append(value).append(',');
+ this.checkSizeLimit();
+ }
+
+ public void append(String key, String value) {
+ this.appendString(key).append(':');
+ this.appendString(value).append(',');
+ this.checkSizeLimit();
+ }
+
+ public void appendRaw(String key, String rawJson) {
+ this.appendString(key).append(':');
+ this.json.append(rawJson).append(',');
+ this.checkSizeLimit();
+ }
+
+ public void append(Set<Entry<Id, MutableLong>> kvs) {
+ for (Map.Entry<Id, MutableLong> top : kvs) {
+ this.append(top.getKey(), top.getValue());
+ }
+ }
+
+ private StringBuilder appendString(String str) {
+ if (str.indexOf('"') >= 0) {
+ str = StringEscapeUtils.escapeJson(str);
+ }
+ return this.json.append('"').append(str).append('"');
+ }
+
+ public void checkSizeLimit() {
+ E.checkArgument(this.json.length() < MAX_RESULT_SIZE,
+ "The result size exceeds limit %s",
+ MAX_RESULT_SIZE);
+ }
+
+ public Object asJson() {
+ return JsonUtil.asJson(this.json.toString());
+ }
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Algorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Algorithm.java
new file mode 100644
index 000000000..6ad200157
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Algorithm.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm;
+
+import java.util.Map;
+
+import com.baidu.hugegraph.job.Job;
+
+public interface Algorithm {
+
+ public String name();
+
+ public String category();
+
+ public Object call(Job<Object> job, Map<String, Object> parameters);
+
+ public void checkParameters(Map<String, Object> parameters);
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AlgorithmPool.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AlgorithmPool.java
new file mode 100644
index 000000000..98f7c89dc
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AlgorithmPool.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.baidu.hugegraph.job.algorithm.cent.BetweenessCentralityAlgorithm;
+import com.baidu.hugegraph.job.algorithm.cent.ClosenessCentralityAlgorithm;
+import com.baidu.hugegraph.job.algorithm.cent.DegreeCentralityAlgorithm;
+import com.baidu.hugegraph.job.algorithm.cent.EigenvectorCentralityAlgorithm;
+import com.baidu.hugegraph.job.algorithm.comm.ClusterCoeffcientAlgorithm;
+import com.baidu.hugegraph.job.algorithm.comm.LouvainAlgorithm;
+import com.baidu.hugegraph.job.algorithm.comm.LpaAlgorithm;
+import com.baidu.hugegraph.job.algorithm.comm.TriangleCountAlgorithm;
+
+public class AlgorithmPool {
+
+ private static final AlgorithmPool INSTANCE = new AlgorithmPool();
+
+ static {
+ INSTANCE.register(new CountVertexAlgorithm());
+ INSTANCE.register(new CountEdgeAlgorithm());
+
+ INSTANCE.register(new DegreeCentralityAlgorithm());
+ INSTANCE.register(new BetweenessCentralityAlgorithm());
+ INSTANCE.register(new ClosenessCentralityAlgorithm());
+ INSTANCE.register(new EigenvectorCentralityAlgorithm());
+
+ INSTANCE.register(new TriangleCountAlgorithm());
+ INSTANCE.register(new ClusterCoeffcientAlgorithm());
+ INSTANCE.register(new LpaAlgorithm());
+ INSTANCE.register(new LouvainAlgorithm());
+ }
+
+ private final Map<String, Algorithm> algorithms;
+
+ public AlgorithmPool() {
+ this.algorithms = new ConcurrentHashMap<>();
+ }
+
+ public Algorithm register(Algorithm algo) {
+ assert !this.algorithms.containsKey(algo.name());
+ return this.algorithms.put(algo.name(), algo);
+ }
+
+ public Algorithm find(String name) {
+ return this.algorithms.get(name);
+ }
+
+ public static AlgorithmPool instance() {
+ return INSTANCE;
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountEdgeAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountEdgeAlgorithm.java
new file mode 100644
index 000000000..9fb122348
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountEdgeAlgorithm.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.util.JsonUtil;
+
+public class CountEdgeAlgorithm extends AbstractAlgorithm {
+
+ @Override
+ public String name() {
+ return "count_edge";
+ }
+
+ @Override
+ public String category() {
+ return CATEGORY_AGGR;
+ }
+
+ @Override
+ public Object call(Job<Object> job, Map<String, Object> parameters) {
+ Traverser traverser = new Traverser(job);
+ return traverser.count();
+ }
+
+ private static class Traverser extends AlgoTraverser {
+
+ public Traverser(Job<Object> job) {
+ super(job);
+ }
+
+ public Object count() {
+ Iterator<Edge> edges = this.edges(null);
+
+ Map<String, MutableLong> counts = new HashMap<>();
+ long total = 0L;
+
+ while (edges.hasNext()) {
+ Edge edge = edges.next();
+ String label = edge.label();
+ MutableLong count = counts.get(label);
+ if (count != null) {
+ count.increment();
+ } else {
+ counts.put(label, new MutableLong(1L));
+ }
+ total++;
+ this.updateProgress(total);
+ }
+ counts.put("*", new MutableLong(total));
+
+ return JsonUtil.asJson(counts);
+ }
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountVertexAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountVertexAlgorithm.java
new file mode 100644
index 000000000..582e0bb69
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountVertexAlgorithm.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.util.JsonUtil;
+
+public class CountVertexAlgorithm extends AbstractAlgorithm {
+
+ @Override
+ public String name() {
+ return "count_vertex";
+ }
+
+ @Override
+ public String category() {
+ return CATEGORY_AGGR;
+ }
+
+ @Override
+ public Object call(Job<Object> job, Map<String, Object> parameters) {
+ Traverser traverser = new Traverser(job);
+ return traverser.count();
+ }
+
+ private static class Traverser extends AlgoTraverser {
+
+ public Traverser(Job<Object> job) {
+ super(job);
+ }
+
+ public Object count() {
+ Iterator<Vertex> vertices = this.vertices();
+
+ Map<String, MutableLong> counts = new HashMap<>();
+ long total = 0L;
+
+ while (vertices.hasNext()) {
+ Vertex vertex = vertices.next();
+ String label = vertex.label();
+ MutableLong count = counts.get(label);
+ if (count != null) {
+ count.increment();
+ } else {
+ counts.put(label, new MutableLong(1L));
+ }
+ total++;
+ this.updateProgress(total);
+ }
+ counts.put("*", new MutableLong(total));
+
+ return JsonUtil.asJson(counts);
+ }
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java
new file mode 100644
index 000000000..14841043a
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/AbstractCentAlgorithm.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.cent;
+
+import java.util.Map;
+
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm;
+import com.baidu.hugegraph.job.algorithm.comm.LpaAlgorithm;
+
+public abstract class AbstractCentAlgorithm extends AbstractAlgorithm {
+
+ protected static final String C_LABEL = LpaAlgorithm.Traverser.C_LABEL;
+
+ @Override
+ public String category() {
+ return CATEGORY_CENT;
+ }
+
+ @Override
+ public void checkParameters(Map<String, Object> parameters) {
+ depth(parameters);
+ degree(parameters);
+ sample(parameters);
+ sourceSample(parameters);
+ sourceLabel(parameters);
+ sourceCLabel(parameters);
+ top(parameters);
+ }
+
+ public static class Traverser extends AlgoTraverser {
+
+ public Traverser(Job<Object> job) {
+ super(job);
+ }
+
+ protected GraphTraversal<Vertex, Vertex> constructSource(
+ String sourceLabel,
+ long sourceSample,
+ String sourceCLabel) {
+ GraphTraversal<Vertex, Vertex> t = this.graph().traversal()
+ .withSack(1f).V();
+
+ if (sourceLabel != null) {
+ t = t.hasLabel(sourceLabel);
+ }
+
+ t = t.filter(it -> {
+ this.updateProgress(++this.progress);
+ return sourceCLabel == null ? true :
+ match(it.get(), C_LABEL, sourceCLabel);
+ });
+
+ if (sourceSample > 0L) {
+ t = t.sample((int) sourceSample);
+ }
+
+ return t;
+ }
+
+ protected GraphTraversal<Vertex, Vertex> constructPath(
+ GraphTraversal<Vertex, Vertex> t, long degree,
+ long sample, String sourceLabel, String sourceCLabel) {
+ GraphTraversal<?, Vertex> unit = constructPathUnit(degree, sample,
+ sourceLabel,
+ sourceCLabel);
+ t = t.as("v").repeat(__.local(unit).simplePath().as("v"));
+
+ return t;
+ }
+
+ protected GraphTraversal<Vertex, Vertex> constructPathUnit(
+ long degree, long sample,
+ String sourceLabel,
+ String sourceCLabel) {
+ GraphTraversal<Vertex, Vertex> unit = __.both();
+ if (sourceLabel != null) {
+ unit = unit.hasLabel(sourceLabel);
+ }
+ if (sourceCLabel != null) {
+ unit = unit.has(C_LABEL, sourceCLabel);
+ }
+ if (degree != NO_LIMIT) {
+ unit = unit.limit(degree);
+ }
+ if (sample > 0L) {
+ unit = unit.sample((int) sample);
+ }
+ return unit;
+ }
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java
new file mode 100644
index 000000000..ae1b8bb74
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.cent;
+
+import java.util.Map;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Order;
+import org.apache.tinkerpop.gremlin.process.traversal.P;
+import org.apache.tinkerpop.gremlin.process.traversal.Pop;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.structure.Column;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import com.baidu.hugegraph.job.Job;
+
+public class BetweenessCentralityAlgorithm extends AbstractCentAlgorithm {
+
+ @Override
+ public String name() {
+ return "betweeness_centrality";
+ }
+
+ @Override
+ public Object call(Job<Object> job, Map<String, Object> parameters) {
+ Traverser traverser = new Traverser(job);
+ return traverser.betweenessCentrality(depth(parameters),
+ degree(parameters),
+ sample(parameters),
+ sourceLabel(parameters),
+ sourceSample(parameters),
+ sourceCLabel(parameters),
+ top(parameters));
+ }
+
+ private static class Traverser extends AbstractCentAlgorithm.Traverser {
+
+ public Traverser(Job<Object> job) {
+ super(job);
+ }
+
+ public Object betweenessCentrality(int depth,
+ long degree,
+ long sample,
+ String sourceLabel,
+ long sourceSample,
+ String sourceCLabel,
+ long topN) {
+ assert depth > 0;
+ assert degree > 0L;
+ assert topN >= 0L;
+
+ GraphTraversal<Vertex, Vertex> t = constructSource(sourceLabel,
+ sourceSample,
+ sourceCLabel);
+ t = constructPath(t, degree, sample, sourceLabel, sourceCLabel);
+ t = t.emit().until(__.loops().is(P.gte(depth)));
+
+ @SuppressWarnings({ "unchecked", "deprecation" })
+ GraphTraversal<Vertex, Vertex> tf = t.filter(
+ __.project("x","y","z")
+ .by(__.select(Pop.first, "v").id())
+ .by(__.select(Pop.last, "v").id())
+ .by(__.select(Pop.all, "v").count(Scope.local))
+ .as("triple")
+ .coalesce(__.select("x","y").as("a")
+ .select("triples").unfold().as("t")
+ .select("x","y").where(P.eq("a")).select("t"),
+ __.store("triples"))
+ .select("z").as("length")
+ .select("triple").select("z").where(P.eq("length")));
+
+ GraphTraversal<Vertex, ?> tg = tf.select(Pop.all, "v")
+ .unfold().id()
+ .groupCount().order(Scope.local)
+ .by(Column.values, Order.desc);
+ GraphTraversal<Vertex, ?> tLimit = topN <= 0L ? tg :
+ tg.limit(Scope.local, topN);
+
+ return this.execute(tLimit, () -> tLimit.next());
+ }
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java
new file mode 100644
index 000000000..d890db808
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.cent;
+
+import java.util.Map;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Operator;
+import org.apache.tinkerpop.gremlin.process.traversal.Order;
+import org.apache.tinkerpop.gremlin.process.traversal.P;
+import org.apache.tinkerpop.gremlin.process.traversal.Pop;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.structure.Column;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import com.baidu.hugegraph.job.Job;
+
+public class ClosenessCentralityAlgorithm extends AbstractCentAlgorithm {
+
+ public static final long DEFAULT_DEGREE = 100L;
+ public static final long DEFAULT_SAMPLE = 1L;
+
+ @Override
+ public String name() {
+ return "closeness_centrality";
+ }
+
+ @Override
+ public void checkParameters(Map<String, Object> parameters) {
+ depth(parameters);
+ }
+
+ @Override
+ public Object call(Job<Object> job, Map<String, Object> parameters) {
+ Traverser traverser = new Traverser(job);
+ return traverser.closenessCentrality(depth(parameters),
+ degree(parameters),
+ sample(parameters),
+ sourceLabel(parameters),
+ sourceSample(parameters),
+ sourceCLabel(parameters),
+ top(parameters));
+ }
+
+ private static class Traverser extends AbstractCentAlgorithm.Traverser {
+
+ public Traverser(Job<Object> job) {
+ super(job);
+ }
+
+ public Object closenessCentrality(int depth,
+ long degree,
+ long sample,
+ String sourceLabel,
+ long sourceSample,
+ String sourceCLabel,
+ long topN) {
+ assert depth > 0;
+ assert degree > 0L;
+ assert topN >= 0L;
+
+ GraphTraversal<Vertex, Vertex> t = constructSource(sourceLabel,
+ sourceSample,
+ sourceCLabel);
+ t = constructPath(t, degree, sample, sourceLabel, sourceCLabel);
+ t = t.emit().until(__.loops().is(P.gte(depth)));
+
+ @SuppressWarnings({ "unchecked", "deprecation" })
+ GraphTraversal<Vertex, Vertex> tf = t.filter(
+ __.project("x","y","z")
+ .by(__.select(Pop.first, "v").id())
+ .by(__.select(Pop.last, "v").id())
+ .by(__.select(Pop.all, "v").count(Scope.local))
+ .as("triple")
+ .coalesce(__.select("x","y").as("a")
+ .select("triples").unfold().as("t")
+ .select("x","y").where(P.eq("a")).select("t"),
+ __.store("triples"))
+ .select("z").as("length")
+ .select("triple").select("z").where(P.eq("length")));
+
+ GraphTraversal<Vertex, ?> tg;
+ tg = tf.group().by(__.select(Pop.first, "v").id())
+ .by(__.select(Pop.all, "v").count(Scope.local)
+ .sack(Operator.div).sack().sum())
+ .order(Scope.local).by(Column.values, Order.desc);
+ GraphTraversal<Vertex, ?> tLimit = topN <= 0L ? tg :
+ tg.limit(Scope.local, topN);
+
+ return this.execute(tLimit, () -> tLimit.next());
+ }
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java
new file mode 100644
index 000000000..81bd33672
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.cent;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import com.baidu.hugegraph.backend.id.Id;
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.structure.HugeEdge;
+import com.baidu.hugegraph.type.define.Directions;
+
+public class DegreeCentralityAlgorithm extends AbstractCentAlgorithm {
+
+ @Override
+ public String name() {
+ return "degree_centrality";
+ }
+
+ @Override
+ public void checkParameters(Map<String, Object> parameters) {
+ direction(parameters);
+ top(parameters);
+ }
+
+ @Override
+ public Object call(Job<Object> job, Map<String, Object> parameters) {
+ Traverser traverser = new Traverser(job);
+ return traverser.degreeCentrality(direction(parameters),
+ top(parameters));
+ }
+
+ private static class Traverser extends AlgoTraverser {
+
+ public Traverser(Job<Object> job) {
+ super(job);
+ }
+
+ public Object degreeCentrality(Directions direction, long topN) {
+ if (direction == null || direction == Directions.BOTH) {
+ return degreeCentrality(topN);
+ }
+ assert direction == Directions.OUT || direction == Directions.IN;
+ assert topN >= 0L;
+
+ Iterator<Edge> edges = this.edges(direction);
+
+ JsonMap degrees = new JsonMap();
+ TopMap tops = new TopMap(topN);
+ Id vertex = null;
+ long degree = 0L;
+ long total = 0L;
+
+ degrees.startObject();
+ while (edges.hasNext()) {
+ HugeEdge edge = (HugeEdge) edges.next();
+ this.updateProgress(++total);
+
+ Id source = edge.ownerVertex().id();
+ if (source.equals(vertex)) {
+ degree++;
+ continue;
+ }
+ if (vertex != null) {
+ if (topN <= 0L) {
+ degrees.append(vertex, degree);
+ } else {
+ tops.put(vertex, degree);
+ }
+ }
+ vertex = source;
+ degree = 1L;
+ }
+
+ if (vertex != null) {
+ if (topN <= 0L) {
+ degrees.append(vertex, degree);
+ } else {
+ tops.put(vertex, degree);
+ degrees.append(tops.entrySet());
+ }
+ }
+
+ degrees.endObject();
+
+ return degrees.asJson();
+ }
+
+ protected Object degreeCentrality(long topN) {
+ assert topN >= 0L;
+ long total = 0L;
+ JsonMap degrees = new JsonMap();
+ TopMap tops = new TopMap(topN);
+
+ GraphTraversalSource traversal = this.graph().traversal();
+ Iterator<Vertex> vertices = this.vertices();
+
+ degrees.startObject();
+ while (vertices.hasNext()) {
+ Vertex source = vertices.next();
+ this.updateProgress(++total);
+
+ Long degree = traversal.V(source).bothE().count().next();
+ if (topN <= 0L) {
+ degrees.append(source.id(), degree);
+ } else {
+ tops.put((Id) source.id(), degree);
+ }
+ }
+
+ if (tops.size() > 0) {
+ degrees.append(tops.entrySet());
+ }
+ degrees.endObject();
+
+ return degrees.asJson();
+ }
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java
new file mode 100644
index 000000000..d87fc7931
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.cent;
+
+import java.util.Map;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Order;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.structure.Column;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import com.baidu.hugegraph.job.Job;
+
+public class EigenvectorCentralityAlgorithm extends AbstractCentAlgorithm {
+
+ public static final long DEFAULT_DEGREE = 100L;
+ public static final long DEFAULT_SAMPLE = 1L;
+
+ @Override
+ public String name() {
+ return "eigenvector_centrality";
+ }
+
+ @Override
+ public Object call(Job<Object> job, Map<String, Object> parameters) {
+ Traverser traverser = new Traverser(job);
+ return traverser.eigenvectorCentrality(depth(parameters),
+ degree(parameters),
+ sample(parameters),
+ sourceLabel(parameters),
+ sourceSample(parameters),
+ sourceCLabel(parameters),
+ top(parameters));
+ }
+
+ private static class Traverser extends AbstractCentAlgorithm.Traverser {
+
+ public Traverser(Job<Object> job) {
+ super(job);
+ }
+
+ public Object eigenvectorCentrality(int depth,
+ long degree,
+ long sample,
+ String sourceLabel,
+ long sourceSample,
+ String sourceCLabel,
+ long topN) {
+ assert depth > 0;
+ assert degree > 0L;
+ assert topN >= 0L;
+
+ // TODO: support parameters: Directions dir, String label
+ /*
+ * g.V().repeat(groupCount('m').by(id)
+ * .local(both().limit(50).sample(1))
+ * .simplePath())
+ * .times(4).cap('m')
+ * .order(local).by(values, desc)
+ * .limit(local, 100)
+ */
+
+ GraphTraversal<Vertex, Vertex> t = constructSource(sourceLabel,
+ sourceSample,
+ sourceCLabel);
+ GraphTraversal<?, Vertex> unit = constructPathUnit(degree, sample,
+ sourceLabel,
+ sourceCLabel);
+ t = t.repeat(__.groupCount("m").by(T.id)
+ .local(unit).simplePath()).times(depth);
+
+ GraphTraversal<Vertex, Object> tCap;
+ tCap = t.cap("m").order(Scope.local).by(Column.values, Order.desc);
+ GraphTraversal<Vertex, ?> tLimit = topN <= 0L ? tCap :
+ tCap.limit(Scope.local, topN);
+
+ return this.execute(tLimit, () -> tLimit.next());
+ }
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/AbstractCommAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/AbstractCommAlgorithm.java
new file mode 100644
index 000000000..74b884a06
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/AbstractCommAlgorithm.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.comm;
+
+import java.util.Map;
+
+import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm;
+import com.baidu.hugegraph.traversal.algorithm.HugeTraverser;
+import com.baidu.hugegraph.util.E;
+
+public abstract class AbstractCommAlgorithm extends AbstractAlgorithm {
+
+ private static final int MAX_TIMES = 2048;
+
+ @Override
+ public String category() {
+ return CATEGORY_COMM;
+ }
+
+ protected static int times(Map<String, Object> parameters) {
+ if (!parameters.containsKey(KEY_TIMES)) {
+ return (int) DEFAULT_TIMES;
+ }
+ int times = parameterInt(parameters, KEY_TIMES);
+ HugeTraverser.checkPositiveOrNoLimit(times, KEY_TIMES);
+ E.checkArgument(times <= MAX_TIMES,
+ "The maximum number of iterations is %s, but got %s",
+ MAX_TIMES, times);
+ return times;
+ }
+
+ protected static int stableTimes(Map<String, Object> parameters) {
+ if (!parameters.containsKey(KEY_STABLE_TIMES)) {
+ return (int) DEFAULT_STABLE_TIMES;
+ }
+ int times = parameterInt(parameters, KEY_STABLE_TIMES);
+ HugeTraverser.checkPositiveOrNoLimit(times, KEY_STABLE_TIMES);
+ E.checkArgument(times <= MAX_TIMES,
+ "The maximum number of stable iterations is %s, " +
+ "but got %s", MAX_TIMES, times);
+ return times;
+ }
+
+ protected static double precision(Map<String, Object> parameters) {
+ if (!parameters.containsKey(KEY_PRECISION)) {
+ return DEFAULT_PRECISION;
+ }
+ double precision = parameterDouble(parameters, KEY_PRECISION);
+ E.checkArgument(0d < precision && precision < 1d,
+ "The %s parameter must be in range(0,1), but got: %s",
+ KEY_PRECISION, precision);
+ return precision;
+ }
+
+ protected static String showCommunity(Map<String, Object> parameters) {
+ if (!parameters.containsKey(KEY_SHOW_COMM)) {
+ return null;
+ }
+ return parameterString(parameters, KEY_SHOW_COMM);
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java
new file mode 100644
index 000000000..cc893fc1f
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.comm;
+
+import java.util.Map;
+
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.type.define.Directions;
+import com.baidu.hugegraph.util.InsertionOrderUtil;
+
+public class ClusterCoeffcientAlgorithm extends AbstractCommAlgorithm {
+
+ @Override
+ public String name() {
+ return "cluster_coeffcient";
+ }
+
+ @Override
+ public void checkParameters(Map<String, Object> parameters) {
+ direction(parameters);
+ degree(parameters);
+ }
+
+ @Override
+ public Object call(Job<Object> job, Map<String, Object> parameters) {
+ Traverser traverser = new Traverser(job);
+ return traverser.clusterCoeffcient(direction(parameters),
+ degree(parameters));
+ }
+
+ private static class Traverser extends TriangleCountAlgorithm.Traverser {
+
+ public Traverser(Job<Object> job) {
+ super(job);
+ }
+
+ public Object clusterCoeffcient(Directions direction, long degree) {
+ Map<String, Long> results = this.triangles(direction, degree);
+ results = InsertionOrderUtil.newMap(results);
+
+ long triangles = results.remove(KEY_TRIANGLES);
+ long triads = results.remove(KEY_TRIADS);
+ assert triangles <= triads;
+ double coeffcient = triads == 0L ? 0d : 1d * triangles / triads;
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ Map<String, Double> converted = (Map) results;
+ converted.put("cluster_coeffcient", coeffcient);
+
+ return results;
+ }
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java
new file mode 100644
index 000000000..3f6de63e8
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.comm;
+
+import java.util.Map;
+
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.util.E;
+
+public class LouvainAlgorithm extends AbstractCommAlgorithm {
+
+ @Override
+ public String name() {
+ return "louvain";
+ }
+
+ @Override
+ public void checkParameters(Map<String, Object> parameters) {
+ times(parameters);
+ stableTimes(parameters);
+ precision(parameters);
+ degree(parameters);
+ sourceLabel(parameters);
+ sourceCLabel(parameters);
+ showCommunity(parameters);
+ clearPass(parameters);
+ }
+
+ @Override
+ public Object call(Job<Object> job, Map<String, Object> parameters) {
+ String label = sourceLabel(parameters);
+ String clabel = sourceCLabel(parameters);
+ long degree = degree(parameters);
+
+ LouvainTraverser traverser = new LouvainTraverser(job, degree,
+ label, clabel);
+ Long clearPass = clearPass(parameters);
+ String showComm = showCommunity(parameters);
+ try {
+ if (clearPass != null) {
+ return traverser.clearPass(clearPass.intValue());
+ } else if (showComm != null) {
+ return traverser.showCommunity(showComm);
+ } else {
+ return traverser.louvain(times(parameters),
+ stableTimes(parameters),
+ precision(parameters));
+ }
+ } catch (Throwable e) {
+ job.graph().tx().rollback();
+ throw e;
+ }
+ }
+
+ protected static Long clearPass(Map<String, Object> parameters) {
+ if (!parameters.containsKey(KEY_CLEAR)) {
+ return null;
+ }
+ long pass = parameterLong(parameters, KEY_CLEAR);
+ // TODO: change to checkNonNegative()
+ E.checkArgument(pass >= 0 || pass == -1,
+ "The %s parameter must be >= 0 or == -1, but got %s",
+ KEY_CLEAR, pass);
+ return pass;
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java
new file mode 100644
index 000000000..0b3d674aa
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java
@@ -0,0 +1,715 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.comm;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.backend.id.Id;
+import com.baidu.hugegraph.backend.id.IdGenerator;
+import com.baidu.hugegraph.exception.ExistedException;
+import com.baidu.hugegraph.iterator.ListIterator;
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm;
+import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm.AlgoTraverser;
+import com.baidu.hugegraph.schema.SchemaLabel;
+import com.baidu.hugegraph.schema.SchemaManager;
+import com.baidu.hugegraph.schema.VertexLabel;
+import com.baidu.hugegraph.structure.HugeEdge;
+import com.baidu.hugegraph.structure.HugeVertex;
+import com.baidu.hugegraph.type.define.Directions;
+import com.baidu.hugegraph.util.Log;
+import com.google.common.collect.ImmutableMap;
+
+public class LouvainTraverser extends AlgoTraverser {
+
+ public static final String C_PASS = "c_pass-";
+ public static final String C_KIN = "c_kin";
+ public static final String C_WEIGHT = "c_weight";
+ public static final String C_MEMBERS = "c_members";
+
+ public static final String C_LABEL = LpaAlgorithm.Traverser.C_LABEL;
+
+ private static final long LIMIT = AbstractAlgorithm.MAX_QUERY_LIMIT;
+
+ private static final Logger LOG = Log.logger(LouvainTraverser.class);
+
+ private final GraphTraversalSource g;
+ private final long m;
+ private final String sourceLabel;
+ private final String sourceCLabel;
+ private final long degree;
+ private final Cache cache;
+
+ private String passLabel;
+
+ public LouvainTraverser(Job<Object> job, long degree,
+ String sourceLabel, String sourceCLabel) {
+ super(job);
+ this.g = this.graph().traversal();
+ this.m = this.g.E().count().next();
+ this.sourceLabel = sourceLabel;
+ this.sourceCLabel = sourceCLabel;
+ this.degree = degree;
+ this.passLabel = "";
+
+ this.cache = new Cache();
+ }
+
+ @SuppressWarnings("unused")
+ private Id genId2(int pass, Id cid) {
+ // gen id for merge-community vertex
+ String id = cid.toString();
+ if (pass == 0) {
+ // conncat pass with cid
+ id = pass + "~" + id;
+ } else {
+ // replace last pass with current pass
+ String lastPass = String.valueOf(pass - 1);
+ assert id.startsWith(lastPass);
+ id = id.substring(lastPass.length());
+ id = pass + id;
+ }
+ return IdGenerator.of(id);
+ }
+
+ private void defineSchemaOfPk() {
+ String label = this.labelOfPassN(0);
+ if (this.graph().existsVertexLabel(label) ||
+ this.graph().existsEdgeLabel(label)) {
+ throw new IllegalArgumentException(
+ "Please clear historical results before proceeding");
+ }
+
+ SchemaManager schema = this.graph().schema();
+ schema.propertyKey(C_KIN).asInt()
+ .ifNotExist().create();
+ schema.propertyKey(C_MEMBERS).valueSet().asText()
+ .ifNotExist().create();
+ schema.propertyKey(C_WEIGHT).asFloat()
+ .ifNotExist().create();
+ }
+
+ private void defineSchemaOfPassN(int pass) {
+ this.passLabel = labelOfPassN(pass);
+
+ SchemaManager schema = this.graph().schema();
+ try {
+ schema.vertexLabel(this.passLabel).useCustomizeStringId()
+ .properties(C_KIN, C_MEMBERS)
+ .nullableKeys(C_KIN, C_MEMBERS)
+ .create();
+ schema.edgeLabel(this.passLabel)
+ .sourceLabel(this.passLabel)
+ .targetLabel(this.passLabel)
+ .properties(C_WEIGHT)
+ .create();
+ } catch (ExistedException e) {
+ throw new IllegalArgumentException(
+ "Please clear historical results before proceeding", e);
+ }
+ }
+
+ private List<String> cpassEdgeLabels() {
+ List<String> names = new ArrayList<>();
+ for (SchemaLabel label : this.graph().schema().getEdgeLabels()) {
+ String name = label.name();
+ if (name.startsWith(C_PASS)) {
+ names.add(name);
+ }
+ }
+ return names;
+ }
+
+ private List<String> cpassVertexLabels() {
+ List<String> names = new ArrayList<>();
+ for (SchemaLabel label : this.graph().schema().getVertexLabels()) {
+ String name = label.name();
+ if (name.startsWith(C_PASS)) {
+ names.add(name);
+ }
+ }
+ return names;
+ }
+
+ private String labelOfPassN(int n) {
+ return C_PASS + n;
+ }
+
+ private float weightOfEdge(Edge e) {
+ if (e.label().startsWith(C_PASS)) {
+ assert e.property(C_WEIGHT).isPresent();
+ return e.value(C_WEIGHT);
+ } else if (e.property(C_WEIGHT).isPresent()) {
+ return e.value(C_WEIGHT);
+ }
+ return 1f;
+ }
+
+ private float weightOfEdges(List<Edge> edges) {
+ float weight = 0f;
+ for (Edge edge : edges) {
+ weight += weightOfEdge(edge);
+ }
+ return weight;
+ }
+
+ private Vertex newCommunityNode(Id cid, int kin, List<String> members) {
+ assert !members.isEmpty() : members;
+ return this.graph().addVertex(T.label, this.passLabel, T.id, cid,
+ C_KIN, kin, C_MEMBERS, members);
+ }
+
+ private Vertex makeCommunityNode(Id cid) {
+ VertexLabel vl = this.graph().vertexLabel(this.passLabel);
+ return new HugeVertex(this.graph(), cid, vl);
+ }
+
+ private Edge newCommunityEdge(Vertex source, Vertex target, float weight) {
+ return source.addEdge(this.passLabel, target, C_WEIGHT, weight);
+ }
+
+ private void insertNewCommunity(int pass, Id cid, int kin,
+ List<String> members,
+ Map<Id, MutableInt> cedges) {
+ // create backend vertex if it's the first time
+ Id vid = this.cache.genId(pass, cid);
+ Vertex node = this.newCommunityNode(vid, kin, members);
+ commitIfNeeded();
+ // update backend vertex edges
+ for (Map.Entry<Id, MutableInt> e : cedges.entrySet()) {
+ float weight = e.getValue().floatValue();
+ vid = this.cache.genId(pass, e.getKey());
+ Vertex targetV = this.makeCommunityNode(vid);
+ this.newCommunityEdge(node, targetV, weight);
+ commitIfNeeded();
+ }
+ LOG.debug("Add new comm: {} kin={} size={}", node, kin, members.size());
+ }
+
+ private boolean needSkipVertex(int pass, Vertex v) {
+ // skip the old intermediate data when first pass
+ String label = v.label();
+ if (label.startsWith(C_PASS)) {
+ if (pass == 0) {
+ return true;
+ }
+ String lastPassLabel = labelOfPassN(pass - 1);
+ if (!label.equals(lastPassLabel)) {
+ return true;
+ }
+ }
+ // skip the vertex with unmatched clabel
+ if (this.sourceCLabel != null &&
+ !match(v, C_LABEL, this.sourceCLabel)) {
+ return true;
+ }
+ return false;
+ }
+
+ private Iterator<Vertex> sourceVertices(int pass) {
+ if (pass > 0) {
+ // all vertices of merged community
+ String lastPassLabel = labelOfPassN(pass - 1);
+ return this.vertices(lastPassLabel, LIMIT);
+ } else {
+ assert pass == 0;
+ // all vertices at the first time
+ return this.vertices(this.sourceLabel, LIMIT);
+ }
+ }
+
+ private List<Edge> neighbors(Id vid) {
+ Iterator<Edge> nbs = this.edgesOfVertex(vid, Directions.BOTH,
+ (Id) null, this.degree);
+ @SuppressWarnings("resource")
+ ListIterator<Edge> list = new ListIterator<>(LIMIT, nbs);
+ return (List<Edge>) list.list();
+ }
+
+ private float weightOfVertex(Vertex v, List<Edge> edges) {
+ Float value = this.cache.vertexWeight((Id) v.id());
+ if (value != null) {
+ return value;
+ }
+ if (edges == null) {
+ edges = neighbors((Id) v.id());
+ }
+ float weight = weightOfEdges(edges);
+ this.cache.vertexWeight((Id) v.id(), weight);
+ return weight;
+ }
+
+ private int kinOfVertex(Vertex v) {
+ if (v.label().startsWith(C_PASS) && v.property(C_KIN).isPresent()) {
+ return v.value(C_KIN);
+ }
+ return 0;
+ }
+
+ private Id cidOfVertex(Vertex v) {
+ Id vid = (Id) v.id();
+ Community c = this.cache.vertex2Community(vid);
+ return c != null ? c.cid : vid;
+ }
+
+ // 1: wrap original vertex as community node
+ // 2: add original vertices to community node,
+ // and save as community vertex when merge()
+ // 3: wrap community vertex as community node,
+ // and repeat step 2 and step 3.
+ private Community wrapCommunity(Vertex otherV) {
+ Id vid = (Id) otherV.id();
+ Community comm = this.cache.vertex2Community(vid);
+ if (comm != null) {
+ return comm;
+ }
+
+ comm = new Community(vid);
+ comm.add(this, otherV, null); // will traverse the neighbors of otherV
+ this.cache.vertex2Community(vid, comm);
+ return comm;
+ }
+
+ private Collection<Pair<Community, MutableInt>> nbCommunities(
+ int pass,
+ List<Edge> edges) {
+ // comms is a map of cid:[community,weight]
+ Map<Id, Pair<Community, MutableInt>> comms = new HashMap<>();
+ for (Edge edge : edges) {
+ Vertex otherV = ((HugeEdge) edge).otherVertex();
+ if (needSkipVertex(pass, otherV)) {
+ // skip the old intermediate data, or filter clabel
+ continue;
+ }
+ Community c = wrapCommunity(otherV);
+ if (!comms.containsKey(c.cid)) {
+ comms.put(c.cid, Pair.of(c, new MutableInt(0)));
+ }
+ // calc weight between source vertex and neighbor community
+ comms.get(c.cid).getRight().add(2 * weightOfEdge(edge));
+ }
+ return comms.values();
+ }
+
+ private void moveCommunity(Vertex v, List<Edge> nbs, Community newC) {
+ Id vid = (Id) v.id();
+
+ // remove v from old community
+ Community oldC = this.cache.vertex2Community(vid);
+ if (oldC != null) {
+ oldC.remove(this, v, nbs);
+ }
+
+ // add v to new community
+ newC.add(this, v, nbs);
+ LOG.debug("Move {} to comm: {}", v, newC);
+
+ // update community of v
+ this.cache.vertex2Community(vid, newC);
+ }
+
+ private double moveCommunities(int pass) {
+ Iterator<Vertex> vertices = this.sourceVertices(pass);
+
+ // shuffle
+ //r = r.order().by(shuffle);
+
+ long total = 0L;
+ long moved = 0L;
+ while (vertices.hasNext()) {
+ this.updateProgress(++this.progress);
+ Vertex v = vertices.next();
+ if (needSkipVertex(pass, v)) {
+ // skip the old intermediate data, or filter clabel
+ continue;
+ }
+ total++;
+ Id cid = cidOfVertex(v);
+ List<Edge> nbs = neighbors((Id) v.id());
+ double ki = kinOfVertex(v) + weightOfVertex(v, nbs);
+ // update community of v if △Q changed
+ double maxDeltaQ = 0d;
+ Community bestComm = null;
+ // list all neighbor communities of v
+ for (Pair<Community, MutableInt> nbc : nbCommunities(pass, nbs)) {
+ // △Q = (Ki_in - Ki * Etot / m) / 2m
+ Community otherC = nbc.getLeft();
+ // weight between c and otherC
+ double kiin = nbc.getRight().floatValue();
+ // weight of otherC
+ int tot = otherC.kin() + otherC.kout();
+ if (cid.equals(otherC.cid)) {
+ tot -= ki;
+ assert tot >= 0;
+ // expect tot >= 0, but may be something wrong?
+ if (tot < 0) {
+ tot = 0;
+ }
+ }
+ double deltaQ = kiin - ki * tot / this.m;
+ if (deltaQ > maxDeltaQ) {
+ // TODO: cache otherC for neighbors the same community
+ maxDeltaQ = deltaQ;
+ bestComm = otherC;
+ }
+ }
+ if (maxDeltaQ > 0d && !cid.equals(bestComm.cid)) {
+ moved++;
+ // move v to the community of maxQ neighbor
+ moveCommunity(v, nbs, bestComm);
+ }
+ }
+
+ // maybe always shocking when set degree limit
+ return total == 0L ? 0d : (double) moved / total;
+ }
+
+ private void mergeCommunities(int pass) {
+ // merge each community as a vertex
+ Collection<Pair<Community, Set<Id>>> comms = this.cache.communities();
+ this.cache.resetVertexWeight();
+ for (Pair<Community, Set<Id>> pair : comms) {
+ Community c = pair.getKey();
+ if (c.empty()) {
+ continue;
+ }
+ // update kin and edges between communities
+ int kin = c.kin();
+ Set<Id> vertices = pair.getRight();
+ assert !vertices.isEmpty();
+ List<String> members = new ArrayList<>(vertices.size());
+ Map<Id, MutableInt> cedges = new HashMap<>(vertices.size());
+ for (Id v : vertices) {
+ members.add(v.toString());
+ // collect edges between this community and other communities
+ List<Edge> neighbors = neighbors(v);
+ for (Edge edge : neighbors) {
+ Vertex otherV = ((HugeEdge) edge).otherVertex();
+ if (vertices.contains(otherV.id())) {
+ // inner edges of this community, will be calc twice
+ // due to both e-in and e-out are in vertices,
+ kin += weightOfEdge(edge);
+ continue;
+ }
+ Id otherCid = cidOfVertex(otherV);
+ if (otherCid.compareTo(c.cid) < 0) {
+ // skip if it should be collected by otherC
+ continue;
+ }
+ if (!cedges.containsKey(otherCid)) {
+ cedges.put(otherCid, new MutableInt(0));
+ }
+ cedges.get(otherCid).add(weightOfEdge(edge));
+ }
+ }
+ // insert new community vertex and edges into storage
+ this.insertNewCommunity(pass, c.cid, kin, members, cedges);
+ }
+ this.graph().tx().commit();
+ // reset communities
+ this.cache.reset();
+ }
+
+ public Object louvain(int maxTimes, int stableTimes, double precision) {
+ assert maxTimes > 0;
+ assert precision > 0d;
+
+ this.defineSchemaOfPk();
+
+ /*
+ * iterate until it has stabilized or
+ * the maximum number of times is reached
+ */
+ int times = maxTimes;
+ int movedTimes = 0;
+ double movedPercent = 0d;
+ double lastMovedPercent = 0d;
+
+ for (int i = 0; i < maxTimes; i++) {
+ boolean finished = true;
+ movedPercent = 0d;
+ lastMovedPercent = 1d;
+ int tinyChanges = 0;
+ while ((movedPercent = this.moveCommunities(i)) > 0d) {
+ movedTimes++;
+ finished = false;
+ if (lastMovedPercent - movedPercent < precision) {
+ tinyChanges++;
+ }
+ if (i == 0 && movedPercent < precision) {
+ // stop the first round of iterations early
+ break;
+ }
+ if (tinyChanges >= stableTimes) {
+ // maybe always shaking and falling into an dead loop
+ break;
+ }
+ lastMovedPercent = movedPercent;
+ }
+ if (finished) {
+ times = i;
+ break;
+ } else {
+ this.defineSchemaOfPassN(i);
+ this.mergeCommunities(i);
+ }
+ }
+
+ long communities = 0L;
+ String commLabel = this.passLabel;
+ if (!commLabel.isEmpty()) {
+ GraphTraversal<?, Long> t = this.g.V().hasLabel(commLabel).count();
+ communities = this.execute(t, t::next);
+ }
+ return ImmutableMap.of("pass_times", times,
+ "phase1_times", movedTimes,
+ "last_precision", movedPercent,
+ "times", maxTimes,
+ "communities", communities);
+ }
+
+ public double modularity(int pass) {
+ // pass: label the last pass
+ String label = labelOfPassN(pass);
+ Number kin = this.g.V().hasLabel(label).values(C_KIN).sum().next();
+ Number weight = this.g.E().hasLabel(label).values(C_WEIGHT).sum().next();
+ double m = kin.intValue() + weight.floatValue() * 2.0d;
+ double q = 0.0d;
+ Iterator<Vertex> coms = this.g.V().hasLabel(label);
+ while (coms.hasNext()) {
+ Vertex com = coms.next();
+ int cin = com.value(C_KIN);
+ Number cout = this.g.V(com).bothE().values(C_WEIGHT).sum().next();
+ double cdegree = cin + cout.floatValue();
+ // Q = ∑(I/M - ((2I+O)/2M)^2)
+ q += cin / m - Math.pow(cdegree / m, 2);
+ }
+ return q;
+ }
+
+ public Collection<Object> showCommunity(String community) {
+ final String C_PASS0 = labelOfPassN(0);
+ Collection<Object> comms = Arrays.asList(community);
+ boolean reachPass0 = false;
+ while (comms.size() > 0 && !reachPass0) {
+ Iterator<Vertex> subComms = this.vertices(comms.iterator());
+ comms = new HashSet<>();
+ while (subComms.hasNext()) {
+ this.updateProgress(++this.progress);
+ Vertex sub = subComms.next();
+ if (sub.property(C_MEMBERS).isPresent()) {
+ Set<Object> members = sub.value(C_MEMBERS);
+ reachPass0 = sub.label().equals(C_PASS0);
+ comms.addAll(members);
+ }
+ }
+ }
+ return comms;
+ }
+
+ public long clearPass(int pass) {
+ GraphTraversal<Edge, Edge> te = this.g.E();
+ if (pass < 0) {
+ // drop edges of all pass
+ List<String> els = this.cpassEdgeLabels();
+ if (els.size() > 0) {
+ String first = els.remove(0);
+ te = te.hasLabel(first, els.toArray(new String[els.size()]));
+ this.drop(te);
+ }
+ // drop schema
+ for (String label : this.cpassEdgeLabels()) {
+ this.graph().schema().edgeLabel(label).remove();
+ }
+ } else {
+ // drop edges of pass N
+ String label = labelOfPassN(pass);
+ if (this.graph().existsEdgeLabel(label)) {
+ te = te.hasLabel(label);
+ this.drop(te);
+ // drop schema
+ this.graph().schema().edgeLabel(label).remove();
+ }
+ }
+
+ GraphTraversal<Vertex, Vertex> tv = this.g.V();
+ if (pass < 0) {
+ // drop vertices of all pass
+ List<String> vls = this.cpassVertexLabels();
+ if (vls.size() > 0) {
+ String first = vls.remove(0);
+ tv = tv.hasLabel(first, vls.toArray(new String[vls.size()]));
+ this.drop(tv);
+ }
+ // drop schema
+ for (String label : this.cpassVertexLabels()) {
+ this.graph().schema().vertexLabel(label).remove();
+ }
+ } else {
+ // drop vertices of pass N
+ String label = labelOfPassN(pass);
+ if (this.graph().existsVertexLabel(label)) {
+ tv = tv.hasLabel(label);
+ this.drop(tv);
+ // drop schema
+ this.graph().schema().vertexLabel(label).remove();
+ }
+ }
+
+ return this.progress;
+ }
+
+ private static class Community {
+
+ // community id (stored as a backend vertex)
+ private final Id cid;
+ // community members size
+ private int size = 0;
+ /*
+ * weight of all edges in community(2X), sum of kin of new members
+ * [each is from the last pass, stored in backend vertex]
+ */
+ private int kin = 0;
+ /*
+ * weight of all edges between communities, sum of kout of new members
+ * [each is last pass, calculated in real time by neighbors]
+ */
+ //
+ private int kout = 0;
+
+ public Community(Id cid) {
+ this.cid = cid;
+ }
+
+ public boolean empty() {
+ return this.size <= 0;
+ }
+
+ public void add(LouvainTraverser t, Vertex v, List<Edge> nbs) {
+ this.size++;
+ this.kin += t.kinOfVertex(v);
+ this.kout += t.weightOfVertex(v, nbs);
+ }
+
+ public void remove(LouvainTraverser t, Vertex v, List<Edge> nbs) {
+ this.size--;
+ this.kin -= t.kinOfVertex(v);
+ this.kout -= t.weightOfVertex(v, nbs);
+ }
+
+ public int kin() {
+ return this.kin;
+ }
+
+ public int kout() {
+ return this.kout;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("[%s](size=%s kin=%s kout=%s)",
+ this.cid , this.size, this.kin, this.kout);
+ }
+ }
+
+ private static class Cache {
+
+ private final Map<Id, Float> vertexWeightCache;
+ private final Map<Id, Community> vertex2Community;
+ private final Map<Id, Integer> genIds;
+
+ public Cache() {
+ this.vertexWeightCache = new HashMap<>();
+ this.vertex2Community = new HashMap<>();
+ this.genIds = new HashMap<>();
+ }
+
+ public Community vertex2Community(Id id) {
+ return this.vertex2Community.get(id);
+ }
+
+ public void vertex2Community(Id id, Community c) {
+ this.vertex2Community.put(id, c);
+ }
+
+ public Float vertexWeight(Id id) {
+ return this.vertexWeightCache.get(id);
+ }
+
+ public void vertexWeight(Id id, float weight) {
+ this.vertexWeightCache.put(id, weight);
+ }
+
+ public void reset() {
+ this.vertexWeightCache.clear();
+ this.vertex2Community.clear();
+ this.genIds.clear();
+ }
+
+ public void resetVertexWeight() {
+ this.vertexWeightCache.clear();
+ }
+
+ public Id genId(int pass, Id cid) {
+ if (!this.genIds.containsKey(cid)) {
+ this.genIds.put(cid, this.genIds.size() + 1);
+ }
+ String id = pass + "~" + this.genIds.get(cid);
+ return IdGenerator.of(id);
+ }
+
+ public Collection<Pair<Community, Set<Id>>> communities(){
+ // TODO: get communities from backend store instead of ram
+ Map<Id, Pair<Community, Set<Id>>> comms = new HashMap<>();
+ for (Entry<Id, Community> e : this.vertex2Community.entrySet()) {
+ Community c = e.getValue();
+ if (c.empty()) {
+ continue;
+ }
+ Pair<Community, Set<Id>> pair = comms.get(c.cid);
+ if (pair == null) {
+ pair = Pair.of(c, new HashSet<>());
+ comms.put(c.cid, pair);
+ }
+ // collect members joined to the community [current pass]
+ pair.getRight().add(e.getKey());
+ }
+ return comms.values();
+ }
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java
new file mode 100644
index 000000000..af7b299ae
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java
@@ -0,0 +1,263 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.comm;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import com.baidu.hugegraph.backend.id.Id;
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.schema.SchemaManager;
+import com.baidu.hugegraph.schema.VertexLabel;
+import com.baidu.hugegraph.type.define.Directions;
+import com.google.common.collect.ImmutableMap;
+
+public class LpaAlgorithm extends AbstractCommAlgorithm {
+
+ @Override
+ public String name() {
+ return "lpa";
+ }
+
+ @Override
+ public void checkParameters(Map<String, Object> parameters) {
+ times(parameters);
+ precision(parameters);
+ sourceLabel(parameters);
+ edgeLabel(parameters);
+ direction(parameters);
+ degree(parameters);
+ showCommunity(parameters);
+ }
+
+ @Override
+ public Object call(Job<Object> job, Map<String, Object> parameters) {
+ Traverser traverser = new Traverser(job);
+ String showComm = showCommunity(parameters);
+
+ try {
+ if (showComm != null) {
+ return traverser.showCommunity(showComm);
+ } else {
+ return traverser.lpa(sourceLabel(parameters),
+ edgeLabel(parameters),
+ direction(parameters),
+ degree(parameters),
+ times(parameters),
+ precision(parameters));
+ }
+ } catch (Throwable e) {
+ job.graph().tx().rollback();
+ throw e;
+ }
+ }
+
+ public static class Traverser extends AlgoTraverser {
+
+ public static final String C_LABEL = "c_label";
+ private static final long LIMIT = MAX_QUERY_LIMIT;
+
+ private final Random R = new Random();
+
+ public Traverser(Job<Object> job) {
+ super(job);
+ }
+
+ public Object lpa(String sourceLabel, String edgeLabel,
+ Directions dir, long degree,
+ int maxTimes, double precision) {
+ assert maxTimes > 0;
+ assert precision > 0d;
+
+ this.initSchema();
+
+ int times = maxTimes;
+ double changedPercent = 0d;
+
+ /*
+ * Iterate until:
+ * 1.it has stabilized
+ * 2.or the maximum number of times is reached
+ */
+ for (int i = 0; i < maxTimes; i++) {
+ changedPercent = this.detectCommunities(sourceLabel, edgeLabel,
+ dir, degree);
+ if (changedPercent <= precision) {
+ times = i + 1;
+ break;
+ }
+ }
+
+ long communities = this.graph().traversal().V().limit(10000L)
+ .groupCount().by(C_LABEL)
+ .count(Scope.local).next();
+ return ImmutableMap.of("iteration_times", times,
+ "last_precision", changedPercent,
+ "times", maxTimes,
+ "communities", communities);
+ }
+
+ public Object showCommunity(String clabel) {
+ // all vertices with specified c-label
+ Iterator<Vertex> vertices = this.vertices(LIMIT);
+ vertices = filter(vertices, C_LABEL, clabel);
+
+ JsonMap json = new JsonMap();
+ json.startList();
+ while (vertices.hasNext()) {
+ this.updateProgress(++this.progress);
+ json.append(vertices.next().id().toString());
+ }
+ json.endList();
+
+ return json.asJson();
+ }
+
+ private double detectCommunities(String sourceLabel, String edgeLabel,
+ Directions dir, long degree) {
+ // shuffle: r.order().by(shuffle)
+ // r = this.graph().traversal().V().sample((int) LIMIT);
+
+ // all vertices
+ Iterator<Vertex> vertices = this.vertices(sourceLabel, LIMIT);
+
+ long total = 0L;
+ long changed = 0L;
+ while (vertices.hasNext()) {
+ this.updateProgress(++this.progress);
+ total++;
+ Vertex v = vertices.next();
+ String label = this.voteCommunityOfVertex(v, edgeLabel,
+ dir, degree);
+ // update label if it's absent or changed
+ if (!labelPresent(v) || !label.equals(this.labelOfVertex(v))) {
+ changed++;
+ this.updateLabelOfVertex(v, label);
+ }
+ }
+ this.graph().tx().commit();
+
+ return total == 0L ? 0d : (double) changed / total;
+ }
+
+ private String voteCommunityOfVertex(Vertex vertex, String edgeLabel,
+ Directions dir, long degree) {
+ // neighbors of source vertex v
+ Id source = (Id) vertex.id();
+ Id labelId = this.getEdgeLabelId(edgeLabel);
+ Iterator<Id> neighbors = this.adjacentVertices(source, dir,
+ labelId, degree);
+
+ // whether or not include vertex itself, greatly affects the result.
+ // get a larger number of small communities if include itself
+ //neighbors.inject(v);
+
+ // calculate label frequency
+ Map<String, MutableInt> labels = new HashMap<>();
+ while (neighbors.hasNext()) {
+ String label = this.labelOfVertex(neighbors.next());
+ if (label == null) {
+ // ignore invalid or not-exist vertex
+ continue;
+ }
+ MutableInt labelCount = labels.get(label);
+ if (labelCount != null) {
+ labelCount.increment();
+ } else {
+ labels.put(label, new MutableInt(1));
+ }
+ }
+
+ // isolated vertex
+ if (labels.size() == 0) {
+ return this.labelOfVertex(vertex);
+ }
+
+ // get the labels with maximum frequency
+ List<String> maxLabels = new ArrayList<>();
+ int maxFreq = 1;
+ for (Map.Entry<String, MutableInt> e : labels.entrySet()) {
+ int value = e.getValue().intValue();
+ if (value > maxFreq) {
+ maxFreq = value;
+ maxLabels.clear();
+ }
+ if (value == maxFreq) {
+ maxLabels.add(e.getKey());
+ }
+ }
+
+ /*
+ * TODO:
+ * keep origin label with probability to prevent monster communities
+ */
+
+ // random choice
+ int selected = this.R.nextInt(maxLabels.size());
+ return maxLabels.get(selected);
+ }
+
+ private boolean labelPresent(Vertex vertex) {
+ return vertex.property(C_LABEL).isPresent();
+ }
+
+ private String labelOfVertex(Vertex vertex) {
+ if (!labelPresent(vertex)) {
+ return vertex.id().toString();
+ }
+ return vertex.value(C_LABEL);
+ }
+
+ private String labelOfVertex(Id vid) {
+ // TODO: cache with Map<Id, String>
+ Iterator<Vertex> iter = this.graph().vertices(vid);
+ if (!iter.hasNext()) {
+ return null;
+ }
+ Vertex vertex = iter.next();
+ return this.labelOfVertex(vertex);
+ }
+
+ private void updateLabelOfVertex(Vertex v, String label) {
+ // TODO: cache with Map<Id, String>
+ v.property(C_LABEL, label);
+ this.commitIfNeeded();
+ }
+
+ private void initSchema() {
+ String cl = C_LABEL;
+ SchemaManager schema = this.graph().schema();
+ schema.propertyKey(cl).asText().ifNotExist().create();
+ for (VertexLabel vl : schema.getVertexLabels()) {
+ schema.vertexLabel(vl.name())
+ .properties(cl).nullableKeys(cl)
+ .append();
+ }
+ }
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java
new file mode 100644
index 000000000..c47d19f65
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.comm;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.tinkerpop.gremlin.structure.Edge;
+
+import com.baidu.hugegraph.backend.id.Id;
+import com.baidu.hugegraph.job.Job;
+import com.baidu.hugegraph.structure.HugeEdge;
+import com.baidu.hugegraph.type.define.Directions;
+import com.baidu.hugegraph.util.InsertionOrderUtil;
+import com.google.common.collect.ImmutableMap;
+
+public class TriangleCountAlgorithm extends AbstractCommAlgorithm {
+
+ @Override
+ public String name() {
+ return "triangle_count";
+ }
+
+ @Override
+ public void checkParameters(Map<String, Object> parameters) {
+ direction(parameters);
+ degree(parameters);
+ }
+
+ @Override
+ public Object call(Job<Object> job, Map<String, Object> parameters) {
+ Traverser traverser = new Traverser(job);
+ return traverser.triangleCount(direction(parameters),
+ degree(parameters));
+ }
+
+ protected static class Traverser extends AlgoTraverser {
+
+ protected static final String KEY_TRIANGLES = "triangles";
+ protected static final String KEY_TRIADS = "triads";
+
+ public Traverser(Job<Object> job) {
+ super(job);
+ }
+
+ public Object triangleCount(Directions direction, long degree) {
+ Map<String, Long> results = triangles( direction, degree);
+ results = InsertionOrderUtil.newMap(results);
+ results.remove(KEY_TRIADS);
+ return results;
+ }
+
+ protected Map<String, Long> triangles(Directions direction,
+ long degree) {
+ if (direction == null || direction == Directions.BOTH) {
+ throw new IllegalArgumentException("Direction must be OUT/IN");
+ }
+ assert direction == Directions.OUT || direction == Directions.IN;
+
+ Iterator<Edge> edges = this.edges(direction);
+
+ long triangles = 0L;
+ long triads = 0L;
+ long total = 0L;
+ long totalVertices = 0L;
+ Id vertex = null;
+
+ Set<Id> adjVertices = new HashSet<>();
+ while (edges.hasNext()) {
+ HugeEdge edge = (HugeEdge) edges.next();
+ this.updateProgress(++total);
+
+ Id source = edge.ownerVertex().id();
+ Id target = edge.otherVertex().id();
+ if (vertex == source) {
+ // Ignore and skip the target vertex if exceed degree
+ if (adjVertices.size() < degree || degree == NO_LIMIT) {
+ adjVertices.add(target);
+ }
+ continue;
+ }
+
+ if (vertex != null) {
+ assert vertex != source;
+ /*
+ * Find graph mode like this:
+ * A -> [B,C,D,E,F]
+ * B -> [D,F]
+ * E -> [B,C,F]
+ */
+ triangles += this.intersect(direction, degree, adjVertices);
+ triads += this.localTriads(adjVertices.size());
+ totalVertices++;
+ // Reset for the next source
+ adjVertices = new HashSet<>();
+ }
+ vertex = source;
+ adjVertices.add(target);
+ }
+
+ if (vertex != null) {
+ triangles += this.intersect(direction, degree, adjVertices);
+ triads += this.localTriads(adjVertices.size());
+ totalVertices++;
+ }
+
+ String suffix = "_" + direction.string();
+ return ImmutableMap.of("edges" + suffix, total,
+ "vertices" + suffix, totalVertices,
+ KEY_TRIANGLES, triangles,
+ KEY_TRIADS, triads);
+ }
+
+ protected long intersect(Directions dir, long degree,
+ Set<Id> adjVertices) {
+ long count = 0L;
+ Iterator<Id> vertices;
+ for (Id v : adjVertices) {
+ vertices = this.adjacentVertices(v, dir, null, degree);
+ while (vertices.hasNext()) {
+ Id vertex = vertices.next();
+ if (adjVertices.contains(vertex)) {
+ count++;
+ }
+ }
+ }
+ return count;
+ }
+
+ protected long localTriads(int size) {
+ return size * (size - 1L) / 2L;
+ }
+ }
+}