You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2016/05/04 11:03:27 UTC
[08/17] incubator-tinkerpop git commit: OLAP Traversals now support
traversal interruption.
OLAP Traversals now support traversal interruption.
Implemented for SparkGraphComputer and TinkerGraphComputer.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/173034eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/173034eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/173034eb
Branch: refs/heads/TINKERPOP-946
Commit: 173034eb51da16da9d960dc48b92f792214ab04c
Parents: 8b4e670
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri Apr 22 15:26:35 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed May 4 06:43:35 2016 -0400
----------------------------------------------------------------------
.../traversal/step/map/VertexProgramStep.java | 18 +++-
gremlin-driver/pom.xml | 1 -
.../gremlin/process/ProcessComputerSuite.java | 4 +
.../TraversalInterruptionComputerTest.java | 97 ++++++++++++++++++++
.../traversal/TraversalInterruptionTest.java | 2 +-
.../gremlin/hadoop/structure/HadoopGraph.java | 5 +
pom.xml | 5 +
.../process/computer/SparkGraphComputer.java | 24 ++++-
tinkergraph-gremlin/pom.xml | 4 +
.../process/computer/TinkerGraphComputer.java | 30 +++++-
.../process/computer/TinkerWorkerPool.java | 12 ++-
11 files changed, 187 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
index ab095f2..8fe7ed0 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
@@ -36,9 +36,11 @@ import org.apache.tinkerpop.gremlin.structure.Graph;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Stephen Mallette (http://stephen.genoprime.com)
*/
public abstract class VertexProgramStep extends AbstractStep<ComputerResult, ComputerResult> implements VertexComputing {
@@ -52,21 +54,31 @@ public abstract class VertexProgramStep extends AbstractStep<ComputerResult, Com
@Override
protected Traverser.Admin<ComputerResult> processNextStart() throws NoSuchElementException {
+ Future<ComputerResult> future = null;
try {
if (this.first && this.getPreviousStep() instanceof EmptyStep) {
this.first = false;
final Graph graph = this.getTraversal().getGraph().get();
- final ComputerResult result = this.generateComputer(graph).program(this.generateProgram(graph)).submit().get();
+ future = this.generateComputer(graph).program(this.generateProgram(graph)).submit();
+ final ComputerResult result = future.get();
this.processMemorySideEffects(result.memory());
return this.getTraversal().getTraverserGenerator().generate(result, this, 1l);
} else {
final Traverser.Admin<ComputerResult> traverser = this.starts.next();
final Graph graph = traverser.get().graph();
- final ComputerResult result = this.generateComputer(graph).program(this.generateProgram(graph)).submit().get();
+ future = this.generateComputer(graph).program(this.generateProgram(graph)).submit();
+ final ComputerResult result = future.get();
this.processMemorySideEffects(result.memory());
return traverser.split(result, this);
}
- } catch (final InterruptedException | ExecutionException e) {
+ } catch (InterruptedException ie) {
+ // the thread running the traversal took an interruption while waiting on the call the future.get().
+ // the future should then be cancelled with interruption so that the the GraphComputer that created
+ // the future knows we don't care about it anymore. The GraphComputer should attempt to respect this
+ // cancellation request.
+ if (future != null) future.cancel(true);
+ throw new TraversalInterruptedException();
+ } catch (ExecutionException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/gremlin-driver/pom.xml
----------------------------------------------------------------------
diff --git a/gremlin-driver/pom.xml b/gremlin-driver/pom.xml
index 515f765..ecbf25d 100644
--- a/gremlin-driver/pom.xml
+++ b/gremlin-driver/pom.xml
@@ -60,7 +60,6 @@ limitations under the License.
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
- <version>3.3.1</version>
</dependency>
<!-- TEST -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessComputerSuite.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessComputerSuite.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessComputerSuite.java
index 8f85e62..4c183dc 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessComputerSuite.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessComputerSuite.java
@@ -26,6 +26,7 @@ import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVerte
import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgramTest;
import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgramTest;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalEngine;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionComputerTest;
import org.apache.tinkerpop.gremlin.process.traversal.step.branch.BranchTest;
import org.apache.tinkerpop.gremlin.process.traversal.step.branch.ChooseTest;
import org.apache.tinkerpop.gremlin.process.traversal.step.branch.OptionalTest;
@@ -182,6 +183,9 @@ public class ProcessComputerSuite extends AbstractGremlinSuite {
SubgraphTest.Traversals.class,
TreeTest.Traversals.class,
+ // compliance
+ TraversalInterruptionComputerTest.class,
+
// algorithms
PageRankVertexProgramTest.class,
PeerPressureVertexProgramTest.class,
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionComputerTest.java
new file mode 100644
index 0000000..c3a1042
--- /dev/null
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionComputerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal;
+
+import org.apache.tinkerpop.gremlin.LoadGraphWith;
+import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.UnaryOperator;
+
+import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.GRATEFUL;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+@RunWith(Parameterized.class)
+public class TraversalInterruptionComputerTest extends AbstractGremlinProcessTest {
+
+ @Parameterized.Parameters(name = "expectInterruption({0})")
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {"g_V", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V()},
+ {"g_V_out", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V().out()},
+ {"g_V_outE", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V().outE()},
+ {"g_V_in", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V().in()},
+ {"g_V_inE", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V().inE()},
+ {"g_V_properties", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V().properties()},
+ {"g_E", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.E()},
+ {"g_E_outV", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.E().outV()},
+ {"g_E_inV", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.E().inV()},
+ {"g_E_properties", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.E().properties()},
+ });
+ }
+
+ @Parameterized.Parameter(value = 0)
+ public String name;
+
+ @Parameterized.Parameter(value = 1)
+ public Function<GraphTraversalSource,GraphTraversal<?,?>> traversalMaker;
+
+ @Test
+ @LoadGraphWith(GRATEFUL)
+ public void shouldRespectThreadInterruptionInVertexStep() throws Exception {
+ final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
+ final CountDownLatch startedIterating = new CountDownLatch(1);
+ final Thread t = new Thread(() -> {
+ try {
+ final Traversal traversal = traversalMaker.apply(g).sideEffect(traverser -> {
+ startedIterating.countDown();
+ });
+ traversal.iterate();
+ } catch (Exception ex) {
+ exceptionThrown.set(ex instanceof TraversalInterruptedException);
+ }
+ }, name);
+
+ t.start();
+
+ // total time for test should not exceed 5 seconds - this prevents the test from just hanging and allows
+ // it to finish with failure
+ assertThat(startedIterating.await(5000, TimeUnit.MILLISECONDS), CoreMatchers.is(true));
+
+ t.interrupt();
+ t.join();
+
+ // ensure that some but not all of the traversal was iterated and that the right exception was tossed
+ assertThat(exceptionThrown.get(), CoreMatchers.is(true));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java
index e9d584f..c508df2 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java
@@ -97,7 +97,7 @@ public class TraversalInterruptionTest extends AbstractGremlinProcessTest {
} catch (Exception ex) {
exceptionThrown.set(ex instanceof TraversalInterruptedException);
}
- });
+ }, name);
t.start();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index 3dd0d9a..08bbeeb 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -181,6 +181,11 @@ import java.util.stream.Stream;
method = "shouldStartAndEndWorkersForVertexProgramAndMapReduce",
reason = "Spark executes map and combine in a lazy fashion and thus, fails the blocking aspect of this test",
computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer"})
+@Graph.OptOut(
+ test = "org.apache.tinkerpop.gremlin.process.computer.TraversalInterruptionComputerTest",
+ method = "*",
+ reason = "This test makes an use of a sideEffect to enforce when a thread interruption is triggered and thus isn't applicable to HadoopGraph",
+ computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer"})
public final class HadoopGraph implements Graph {
public static final Logger LOGGER = LoggerFactory.getLogger(HadoopGraph.class);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 28e932c..bd3584d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -499,6 +499,11 @@ limitations under the License.
<version>3.2.2</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.3.1</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 928a880..bc8bc50 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.spark.process.computer;
import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.configuration.FileConfiguration;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -51,6 +52,7 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
import org.apache.tinkerpop.gremlin.spark.structure.Spark;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputFormatRDD;
@@ -67,9 +69,11 @@ import org.apache.tinkerpop.gremlin.structure.io.Storage;
import java.io.File;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
import java.util.stream.Stream;
/**
@@ -77,6 +81,15 @@ import java.util.stream.Stream;
*/
public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
+
+ private final ThreadFactory threadFactoryBoss = new BasicThreadFactory.Builder().namingPattern(SparkGraphComputer.class.getSimpleName() + "-boss").build();
+
+ /**
+ * An {@code ExecutorService} that schedules up background work. Since a {@link GraphComputer} is only used once
+ * for a {@link VertexProgram} a single threaded executor is sufficient.
+ */
+ private final ExecutorService computerService = Executors.newSingleThreadExecutor(threadFactoryBoss);
+
private final org.apache.commons.configuration.Configuration sparkConfiguration;
private boolean workersSet = false;
@@ -110,7 +123,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
private Future<ComputerResult> submitWithExecutor(Executor exec) {
// create the completable future �
- return CompletableFuture.<ComputerResult>supplyAsync(() -> {
+ return computerService.submit(() -> {
final long startTime = System.currentTimeMillis();
// apache and hadoop configurations that are used throughout the graph computer computation
final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.sparkConfiguration);
@@ -219,7 +232,6 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
if (!inputFromSpark || partitioned || filtered)
loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
-
////////////////////////////////
// process the vertex program //
////////////////////////////////
@@ -235,6 +247,10 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
// execute the vertex program
while (true) {
+ if (Thread.interrupted()) {
+ sparkContext.cancelAllJobs();
+ throw new TraversalInterruptedException();
+ }
memory.setInExecute(true);
viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
memory.setInExecute(false);
@@ -312,7 +328,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
if (!apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
Spark.close();
}
- }, exec);
+ });
}
/////////////////
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/tinkergraph-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/pom.xml b/tinkergraph-gremlin/pom.xml
index dd48df6..0f3cecd 100644
--- a/tinkergraph-gremlin/pom.xml
+++ b/tinkergraph-gremlin/pom.xml
@@ -31,6 +31,10 @@ limitations under the License.
<artifactId>gremlin-core</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
<!-- provided scope for gremlin-groovy because it is only used for purpose of scriptengine plugin in
the console and server - in which case that jar should already be present -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 5fb477c..2628ff1 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.tinkergraph.process.computer;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
@@ -27,6 +28,7 @@ import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
@@ -41,8 +43,10 @@ import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -62,6 +66,14 @@ public final class TinkerGraphComputer implements GraphComputer {
private int workers = Runtime.getRuntime().availableProcessors();
private final GraphFilter graphFilter = new GraphFilter();
+ private final ThreadFactory threadFactoryBoss = new BasicThreadFactory.Builder().namingPattern(TinkerGraphComputer.class.getSimpleName() + "-boss").build();
+
+ /**
+ * An {@code ExecutorService} that schedules up background work. Since a {@link GraphComputer} is only used once
+ * for a {@link VertexProgram} a single threaded executor is sufficient.
+ */
+ private final ExecutorService computerService = Executors.newSingleThreadExecutor(threadFactoryBoss);
+
public TinkerGraphComputer(final TinkerGraph graph) {
this.graph = graph;
}
@@ -134,15 +146,17 @@ public final class TinkerGraphComputer implements GraphComputer {
// initialize the memory
this.memory = new TinkerMemory(this.vertexProgram, this.mapReducers);
- return CompletableFuture.<ComputerResult>supplyAsync(() -> {
+ return computerService.submit(() -> {
final long time = System.currentTimeMillis();
final TinkerGraphComputerView view;
- try (final TinkerWorkerPool workers = new TinkerWorkerPool(this.workers)) {
+ final TinkerWorkerPool workers = new TinkerWorkerPool(this.workers);
+ try {
if (null != this.vertexProgram) {
view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, this.vertexProgram.getVertexComputeKeys());
// execute the vertex program
this.vertexProgram.setup(this.memory);
while (true) {
+ if (Thread.interrupted()) throw new TraversalInterruptedException();
this.memory.completeSubRound();
workers.setVertexProgram(this.vertexProgram);
final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
@@ -150,6 +164,7 @@ public final class TinkerGraphComputer implements GraphComputer {
vertexProgram.workerIterationStart(this.memory.asImmutable());
while (true) {
final Vertex vertex = vertices.next();
+ if (Thread.interrupted()) throw new TraversalInterruptedException();
if (null == vertex) break;
vertexProgram.execute(
ComputerGraph.vertexProgram(vertex, vertexProgram),
@@ -182,6 +197,7 @@ public final class TinkerGraphComputer implements GraphComputer {
workers.executeMapReduce(workerMapReduce -> {
workerMapReduce.workerStart(MapReduce.Stage.MAP);
while (true) {
+ if (Thread.interrupted()) throw new TraversalInterruptedException();
final Vertex vertex = vertices.next();
if (null == vertex) break;
workerMapReduce.map(ComputerGraph.mapReduce(vertex), mapEmitter);
@@ -198,6 +214,7 @@ public final class TinkerGraphComputer implements GraphComputer {
workers.executeMapReduce(workerMapReduce -> {
workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
while (true) {
+ if (Thread.interrupted()) throw new TraversalInterruptedException();
final Map.Entry<?, Queue<?>> entry = keyValues.next();
if (null == entry) break;
workerMapReduce.reduce(entry.getKey(), entry.getValue().iterator(), reduceEmitter);
@@ -217,9 +234,14 @@ public final class TinkerGraphComputer implements GraphComputer {
final Graph resultGraph = view.processResultGraphPersist(this.resultGraph, this.persist);
TinkerHelper.dropGraphComputerView(this.graph); // drop the view from the original source graph
return new DefaultComputerResult(resultGraph, this.memory.asImmutable());
-
+ } catch (InterruptedException ie) {
+ workers.closeNow();
+ throw new TraversalInterruptedException();
} catch (Exception ex) {
+ workers.closeNow();
throw new RuntimeException(ex);
+ } finally {
+ workers.close();
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index e9341b4..3d851bf 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -59,7 +59,7 @@ public final class TinkerWorkerPool implements AutoCloseable {
this.mapReducePool = new MapReducePool(mapReduce, this.numberOfWorkers);
}
- public void executeVertexProgram(final Consumer<VertexProgram> worker) {
+ public void executeVertexProgram(final Consumer<VertexProgram> worker) throws InterruptedException {
for (int i = 0; i < this.numberOfWorkers; i++) {
this.completionService.submit(() -> {
final VertexProgram vp = this.vertexProgramPool.take();
@@ -71,13 +71,15 @@ public final class TinkerWorkerPool implements AutoCloseable {
for (int i = 0; i < this.numberOfWorkers; i++) {
try {
this.completionService.take().get();
+ } catch (InterruptedException ie) {
+ throw ie;
} catch (final Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
- public void executeMapReduce(final Consumer<MapReduce> worker) {
+ public void executeMapReduce(final Consumer<MapReduce> worker) throws InterruptedException {
for (int i = 0; i < this.numberOfWorkers; i++) {
this.completionService.submit(() -> {
final MapReduce mr = this.mapReducePool.take();
@@ -89,12 +91,18 @@ public final class TinkerWorkerPool implements AutoCloseable {
for (int i = 0; i < this.numberOfWorkers; i++) {
try {
this.completionService.take().get();
+ } catch (InterruptedException ie) {
+ throw ie;
} catch (final Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
+ public void closeNow() throws Exception {
+ this.workerPool.shutdownNow();
+ }
+
@Override
public void close() throws Exception {
this.workerPool.shutdown();