You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by ji...@apache.org on 2022/11/09 10:25:12 UTC

[incubator-hugegraph] 09/33: fix parallel LPA not commit by threads (#16)

This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git

commit beda055142988798dceed6f8be7b0a0b616f05e6
Author: Jermy Li <li...@baidu.com>
AuthorDate: Wed May 20 16:54:44 2020 +0800

    fix parallel LPA not commit by threads (#16)
    
    Change-Id: I8eaaeccaa0b23048a9d0f597080186c069b9799b
---
 .../com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java     | 7 ++++++-
 .../main/java/com/baidu/hugegraph/job/algorithm/Consumers.java   | 9 +++++++--
 .../com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java     | 5 +++--
 3 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
index c36a70405..5bb3426ff 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
@@ -325,11 +325,16 @@ public abstract class AbstractAlgorithm implements Algorithm {
 
         protected long traverse(String sourceLabel, String sourceCLabel,
                                 Consumer<Vertex> consumer) {
+            return this.traverse(sourceLabel, sourceCLabel, consumer, null);
+        }
+
+        protected long traverse(String sourceLabel, String sourceCLabel,
+                                Consumer<Vertex> consumer, Runnable done) {
             Iterator<Vertex> vertices = this.vertices(sourceLabel, sourceLabel,
                                                       Query.NO_LIMIT);
 
             Consumers<Vertex> consumers = new Consumers<>(this.executor,
-                                                          consumer);
+                                                          consumer, done);
             consumers.start();
 
             long total = 0L;
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
index 795e0d712..526419c46 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
@@ -122,7 +122,7 @@ public class Consumers<V> {
 
     public void provide(V v) {
         if (this.executor == null) {
-            // do job directly
+            // do job directly if without thread pool
             this.consumer.accept(v);
         } else {
             try {
@@ -135,7 +135,12 @@ public class Consumers<V> {
 
     public void await() {
         this.ending = true;
-        if (this.executor != null) {
+        if (this.executor == null) {
+            // call done() directly if without thread pool
+            if (this.done != null) {
+                this.done.run();
+            }
+        } else {
             try {
                 this.latch.await();
             } catch (InterruptedException e) {
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java
index e98ed8480..59c420ae7 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java
@@ -152,10 +152,11 @@ public class LpaAlgorithm extends AbstractCommAlgorithm {
                 if (this.voteCommunityAndUpdate(v, edgeLabel, dir, degree)) {
                     changed.incrementAndGet();
                 }
+            }, () -> {
+                // commit when finished
+                this.graph().tx().commit();
             });
 
-            this.graph().tx().commit();
-
             return total == 0L ? 0d : changed.doubleValue() / total;
         }