You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by ji...@apache.org on 2022/11/09 10:25:20 UTC

[incubator-hugegraph] 17/33: fix no auth with worker thread of olap algo (#27)

This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git

commit 353d90698ff27196a56b3cb85edf342bcd53a8ee
Author: Jermy Li <li...@baidu.com>
AuthorDate: Tue Jul 28 22:10:03 2020 +0800

    fix no auth with worker thread of olap algo (#27)
    
    call graph close instead of closeTx
    
    Change-Id: I0e329280b067f34daec69c9b1b2b81a6cd3309bf
---
 .../baidu/hugegraph/job/algorithm/Consumers.java   | 34 ++++++++++++----------
 .../job/algorithm/SubgraphStatAlgorithm.java       | 13 +++++++--
 2 files changed, 29 insertions(+), 18 deletions(-)

diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
index 711e95edc..1c68413fc 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
@@ -30,6 +30,7 @@ import java.util.function.Consumer;
 import org.slf4j.Logger;
 
 import com.baidu.hugegraph.HugeException;
+import com.baidu.hugegraph.task.TaskManager.ContextCallable;
 import com.baidu.hugegraph.util.ExecutorUtil;
 import com.baidu.hugegraph.util.Log;
 
@@ -82,24 +83,27 @@ public class Consumers<V> {
         LOG.info("Starting {} workers[{}] with queue size {}...",
                  this.workers, name, this.queueSize);
         for (int i = 0; i < this.workers; i++) {
-            this.executor.submit(() -> {
-                try {
-                    this.run();
-                    this.done();
-                } catch (Throwable e) {
-                    // Only the first exception of one thread can be stored
-                    this.exception = e;
-                    if (!(e instanceof StopExecution)) {
-                        LOG.error("Error when running task", e);
-                    }
-                    this.done();
-                } finally {
-                    this.latch.countDown();
-                }
-            });
+            this.executor.submit(new ContextCallable<>(this::runAndDone));
         }
     }
 
+    private Void runAndDone() {
+        try {
+            this.run();
+            this.done();
+        } catch (Throwable e) {
+            // Only the first exception of one thread can be stored
+            this.exception = e;
+            if (!(e instanceof StopExecution)) {
+                LOG.error("Error when running task", e);
+            }
+            this.done();
+        } finally {
+            this.latch.countDown();
+        }
+        return null;
+    }
+
     private void run() {
         LOG.debug("Start to work...");
         while (!this.ending) {
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java
index 199d1b020..a098a8582 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.slf4j.Logger;
 
 import com.baidu.hugegraph.HugeGraph;
 import com.baidu.hugegraph.StandardHugeGraph;
@@ -40,11 +41,11 @@ import com.baidu.hugegraph.job.algorithm.comm.ClusterCoeffcientAlgorithm;
 import com.baidu.hugegraph.job.algorithm.path.RingsDetectAlgorithm;
 import com.baidu.hugegraph.job.algorithm.rank.PageRankAlgorithm;
 import com.baidu.hugegraph.task.HugeTask;
-import com.baidu.hugegraph.testutil.Whitebox;
 import com.baidu.hugegraph.traversal.algorithm.HugeTraverser;
 import com.baidu.hugegraph.traversal.optimize.HugeScriptTraversal;
 import com.baidu.hugegraph.util.E;
 import com.baidu.hugegraph.util.InsertionOrderUtil;
+import com.baidu.hugegraph.util.Log;
 import com.google.common.collect.ImmutableMap;
 
 public class SubgraphStatAlgorithm extends AbstractAlgorithm {
@@ -52,6 +53,8 @@ public class SubgraphStatAlgorithm extends AbstractAlgorithm {
     public static final String KEY_SUBGRAPH = "subgraph";
     public static final String KEY_COPY_SCHEMA = "copy_schema";
 
+    private static final Logger LOG = Log.logger(SubgraphStatAlgorithm.class);
+
     @Override
     public String name() {
         return "subgraph_stat";
@@ -77,8 +80,12 @@ public class SubgraphStatAlgorithm extends AbstractAlgorithm {
             return traverser.subgraphStat(tmpJob);
         } finally {
             graph.truncateBackend();
-            // FIXME: task thread can't call close() (will hang), use closeTx()
-            Whitebox.invoke(graph.getClass(), "closeTx", graph);
+            try {
+                graph.close();
+            } catch (Throwable e) {
+                LOG.warn("Can't close subgraph_stat temp graph {}: {}",
+                         graph, e.getMessage(), e);
+            }
         }
     }