You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by ji...@apache.org on 2022/11/09 10:25:20 UTC
[incubator-hugegraph] 17/33: fix no auth with worker thread of olap algo (#27)
This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit 353d90698ff27196a56b3cb85edf342bcd53a8ee
Author: Jermy Li <li...@baidu.com>
AuthorDate: Tue Jul 28 22:10:03 2020 +0800
fix no auth with worker thread of olap algo (#27)
call graph close instead of closeTx
Change-Id: I0e329280b067f34daec69c9b1b2b81a6cd3309bf
---
.../baidu/hugegraph/job/algorithm/Consumers.java | 34 ++++++++++++----------
.../job/algorithm/SubgraphStatAlgorithm.java | 13 +++++++--
2 files changed, 29 insertions(+), 18 deletions(-)
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
index 711e95edc..1c68413fc 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
@@ -30,6 +30,7 @@ import java.util.function.Consumer;
import org.slf4j.Logger;
import com.baidu.hugegraph.HugeException;
+import com.baidu.hugegraph.task.TaskManager.ContextCallable;
import com.baidu.hugegraph.util.ExecutorUtil;
import com.baidu.hugegraph.util.Log;
@@ -82,24 +83,27 @@ public class Consumers<V> {
LOG.info("Starting {} workers[{}] with queue size {}...",
this.workers, name, this.queueSize);
for (int i = 0; i < this.workers; i++) {
- this.executor.submit(() -> {
- try {
- this.run();
- this.done();
- } catch (Throwable e) {
- // Only the first exception of one thread can be stored
- this.exception = e;
- if (!(e instanceof StopExecution)) {
- LOG.error("Error when running task", e);
- }
- this.done();
- } finally {
- this.latch.countDown();
- }
- });
+ this.executor.submit(new ContextCallable<>(this::runAndDone));
}
}
+ private Void runAndDone() {
+ try {
+ this.run();
+ this.done();
+ } catch (Throwable e) {
+ // Only the first exception of one thread can be stored
+ this.exception = e;
+ if (!(e instanceof StopExecution)) {
+ LOG.error("Error when running task", e);
+ }
+ this.done();
+ } finally {
+ this.latch.countDown();
+ }
+ return null;
+ }
+
private void run() {
LOG.debug("Start to work...");
while (!this.ending) {
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java
index 199d1b020..a098a8582 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/SubgraphStatAlgorithm.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.slf4j.Logger;
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.StandardHugeGraph;
@@ -40,11 +41,11 @@ import com.baidu.hugegraph.job.algorithm.comm.ClusterCoeffcientAlgorithm;
import com.baidu.hugegraph.job.algorithm.path.RingsDetectAlgorithm;
import com.baidu.hugegraph.job.algorithm.rank.PageRankAlgorithm;
import com.baidu.hugegraph.task.HugeTask;
-import com.baidu.hugegraph.testutil.Whitebox;
import com.baidu.hugegraph.traversal.algorithm.HugeTraverser;
import com.baidu.hugegraph.traversal.optimize.HugeScriptTraversal;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.InsertionOrderUtil;
+import com.baidu.hugegraph.util.Log;
import com.google.common.collect.ImmutableMap;
public class SubgraphStatAlgorithm extends AbstractAlgorithm {
@@ -52,6 +53,8 @@ public class SubgraphStatAlgorithm extends AbstractAlgorithm {
public static final String KEY_SUBGRAPH = "subgraph";
public static final String KEY_COPY_SCHEMA = "copy_schema";
+ private static final Logger LOG = Log.logger(SubgraphStatAlgorithm.class);
+
@Override
public String name() {
return "subgraph_stat";
@@ -77,8 +80,12 @@ public class SubgraphStatAlgorithm extends AbstractAlgorithm {
return traverser.subgraphStat(tmpJob);
} finally {
graph.truncateBackend();
- // FIXME: task thread can't call close() (will hang), use closeTx()
- Whitebox.invoke(graph.getClass(), "closeTx", graph);
+ try {
+ graph.close();
+ } catch (Throwable e) {
+ LOG.warn("Can't close subgraph_stat temp graph {}: {}",
+ graph, e.getMessage(), e);
+ }
}
}