You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2016/05/04 11:03:27 UTC

[08/17] incubator-tinkerpop git commit: OLAP Traversals now support traversal interruption.

OLAP Traversals now support traversal interruption.

Implemented for SparkGraphComputer and TinkerGraphComputer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/173034eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/173034eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/173034eb

Branch: refs/heads/TINKERPOP-946
Commit: 173034eb51da16da9d960dc48b92f792214ab04c
Parents: 8b4e670
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri Apr 22 15:26:35 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed May 4 06:43:35 2016 -0400

----------------------------------------------------------------------
 .../traversal/step/map/VertexProgramStep.java   | 18 +++-
 gremlin-driver/pom.xml                          |  1 -
 .../gremlin/process/ProcessComputerSuite.java   |  4 +
 .../TraversalInterruptionComputerTest.java      | 97 ++++++++++++++++++++
 .../traversal/TraversalInterruptionTest.java    |  2 +-
 .../gremlin/hadoop/structure/HadoopGraph.java   |  5 +
 pom.xml                                         |  5 +
 .../process/computer/SparkGraphComputer.java    | 24 ++++-
 tinkergraph-gremlin/pom.xml                     |  4 +
 .../process/computer/TinkerGraphComputer.java   | 30 +++++-
 .../process/computer/TinkerWorkerPool.java      | 12 ++-
 11 files changed, 187 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
index ab095f2..8fe7ed0 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
@@ -36,9 +36,11 @@ import org.apache.tinkerpop.gremlin.structure.Graph;
 
 import java.util.NoSuchElementException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Stephen Mallette (http://stephen.genoprime.com)
  */
 public abstract class VertexProgramStep extends AbstractStep<ComputerResult, ComputerResult> implements VertexComputing {
 
@@ -52,21 +54,31 @@ public abstract class VertexProgramStep extends AbstractStep<ComputerResult, Com
 
     @Override
     protected Traverser.Admin<ComputerResult> processNextStart() throws NoSuchElementException {
+        Future<ComputerResult> future = null;
         try {
             if (this.first && this.getPreviousStep() instanceof EmptyStep) {
                 this.first = false;
                 final Graph graph = this.getTraversal().getGraph().get();
-                final ComputerResult result = this.generateComputer(graph).program(this.generateProgram(graph)).submit().get();
+                future = this.generateComputer(graph).program(this.generateProgram(graph)).submit();
+                final ComputerResult result = future.get();
                 this.processMemorySideEffects(result.memory());
                 return this.getTraversal().getTraverserGenerator().generate(result, this, 1l);
             } else {
                 final Traverser.Admin<ComputerResult> traverser = this.starts.next();
                 final Graph graph = traverser.get().graph();
-                final ComputerResult result = this.generateComputer(graph).program(this.generateProgram(graph)).submit().get();
+                future = this.generateComputer(graph).program(this.generateProgram(graph)).submit();
+                final ComputerResult result = future.get();
                 this.processMemorySideEffects(result.memory());
                 return traverser.split(result, this);
             }
-        } catch (final InterruptedException | ExecutionException e) {
+        } catch (InterruptedException ie) {
+            // the thread running the traversal took an interruption while waiting on the call the future.get().
+            // the future should then be cancelled with interruption so that the the GraphComputer that created
+            // the future knows we don't care about it anymore. The GraphComputer should attempt to respect this
+            // cancellation request.
+            if (future != null) future.cancel(true);
+            throw new TraversalInterruptedException();
+        } catch (ExecutionException e) {
             throw new IllegalStateException(e.getMessage(), e);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/gremlin-driver/pom.xml
----------------------------------------------------------------------
diff --git a/gremlin-driver/pom.xml b/gremlin-driver/pom.xml
index 515f765..ecbf25d 100644
--- a/gremlin-driver/pom.xml
+++ b/gremlin-driver/pom.xml
@@ -60,7 +60,6 @@ limitations under the License.
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
-            <version>3.3.1</version>
         </dependency>
         <!-- TEST -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessComputerSuite.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessComputerSuite.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessComputerSuite.java
index 8f85e62..4c183dc 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessComputerSuite.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessComputerSuite.java
@@ -26,6 +26,7 @@ import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVerte
 import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgramTest;
 import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgramTest;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalEngine;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionComputerTest;
 import org.apache.tinkerpop.gremlin.process.traversal.step.branch.BranchTest;
 import org.apache.tinkerpop.gremlin.process.traversal.step.branch.ChooseTest;
 import org.apache.tinkerpop.gremlin.process.traversal.step.branch.OptionalTest;
@@ -182,6 +183,9 @@ public class ProcessComputerSuite extends AbstractGremlinSuite {
             SubgraphTest.Traversals.class,
             TreeTest.Traversals.class,
 
+            // compliance
+            TraversalInterruptionComputerTest.class,
+
             // algorithms
             PageRankVertexProgramTest.class,
             PeerPressureVertexProgramTest.class,

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionComputerTest.java
new file mode 100644
index 0000000..c3a1042
--- /dev/null
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionComputerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal;
+
+import org.apache.tinkerpop.gremlin.LoadGraphWith;
+import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.UnaryOperator;
+
+import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.GRATEFUL;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+@RunWith(Parameterized.class)
+public class TraversalInterruptionComputerTest extends AbstractGremlinProcessTest {
+
+    @Parameterized.Parameters(name = "expectInterruption({0})")
+    public static Iterable<Object[]> data() {
+        return Arrays.asList(new Object[][]{
+                {"g_V", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V()},
+                {"g_V_out", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V().out()},
+                {"g_V_outE", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V().outE()},
+                {"g_V_in", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V().in()},
+                {"g_V_inE", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V().inE()},
+                {"g_V_properties", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V().properties()},
+                {"g_E", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.E()},
+                {"g_E_outV", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.E().outV()},
+                {"g_E_inV", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.E().inV()},
+                {"g_E_properties", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.E().properties()},
+        });
+    }
+
+    @Parameterized.Parameter(value = 0)
+    public String name;
+
+    @Parameterized.Parameter(value = 1)
+    public Function<GraphTraversalSource,GraphTraversal<?,?>> traversalMaker;
+
+    @Test
+    @LoadGraphWith(GRATEFUL)
+    public void shouldRespectThreadInterruptionInVertexStep() throws Exception {
+        final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
+        final CountDownLatch startedIterating = new CountDownLatch(1);
+        final Thread t = new Thread(() -> {
+            try {
+                final Traversal traversal = traversalMaker.apply(g).sideEffect(traverser -> {
+                    startedIterating.countDown();
+                });
+                traversal.iterate();
+            } catch (Exception ex) {
+                exceptionThrown.set(ex instanceof TraversalInterruptedException);
+            }
+        }, name);
+
+        t.start();
+
+        // total time for test should not exceed 5 seconds - this prevents the test from just hanging and allows
+        // it to finish with failure
+        assertThat(startedIterating.await(5000, TimeUnit.MILLISECONDS), CoreMatchers.is(true));
+
+        t.interrupt();
+        t.join();
+
+        // ensure that some but not all of the traversal was iterated and that the right exception was tossed
+        assertThat(exceptionThrown.get(), CoreMatchers.is(true));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java
index e9d584f..c508df2 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java
@@ -97,7 +97,7 @@ public class TraversalInterruptionTest extends AbstractGremlinProcessTest {
             } catch (Exception ex) {
                 exceptionThrown.set(ex instanceof TraversalInterruptedException);
             }
-        });
+        }, name);
 
         t.start();
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index 3dd0d9a..08bbeeb 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -181,6 +181,11 @@ import java.util.stream.Stream;
         method = "shouldStartAndEndWorkersForVertexProgramAndMapReduce",
         reason = "Spark executes map and combine in a lazy fashion and thus, fails the blocking aspect of this test",
         computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer"})
+@Graph.OptOut(
+        test = "org.apache.tinkerpop.gremlin.process.computer.TraversalInterruptionComputerTest",
+        method = "*",
+        reason = "This test makes an use of a sideEffect to enforce when a thread interruption is triggered and thus isn't applicable to HadoopGraph",
+        computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer"})
 public final class HadoopGraph implements Graph {
 
     public static final Logger LOGGER = LoggerFactory.getLogger(HadoopGraph.class);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 28e932c..bd3584d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -499,6 +499,11 @@ limitations under the License.
                 <version>3.2.2</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.commons</groupId>
+                <artifactId>commons-lang3</artifactId>
+                <version>3.3.1</version>
+            </dependency>
+            <dependency>
                 <groupId>org.apache.hadoop</groupId>
                 <artifactId>hadoop-core</artifactId>
                 <version>1.2.1</version>

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 928a880..bc8bc50 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.spark.process.computer;
 import org.apache.commons.configuration.ConfigurationUtils;
 import org.apache.commons.configuration.FileConfiguration;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -51,6 +52,7 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
 import org.apache.tinkerpop.gremlin.spark.structure.Spark;
 import org.apache.tinkerpop.gremlin.spark.structure.io.InputFormatRDD;
@@ -67,9 +69,11 @@ import org.apache.tinkerpop.gremlin.structure.io.Storage;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
 import java.util.stream.Stream;
 
 /**
@@ -77,6 +81,15 @@ import java.util.stream.Stream;
  */
 public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
 
+
+    private final ThreadFactory threadFactoryBoss = new BasicThreadFactory.Builder().namingPattern(SparkGraphComputer.class.getSimpleName() + "-boss").build();
+
+    /**
+     * An {@code ExecutorService} that schedules up background work. Since a {@link GraphComputer} is only used once
+     * for a {@link VertexProgram} a single threaded executor is sufficient.
+     */
+    private final ExecutorService computerService = Executors.newSingleThreadExecutor(threadFactoryBoss);
+
     private final org.apache.commons.configuration.Configuration sparkConfiguration;
     private boolean workersSet = false;
 
@@ -110,7 +123,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
 
     private Future<ComputerResult> submitWithExecutor(Executor exec) {
         // create the completable future                                                   �
-        return CompletableFuture.<ComputerResult>supplyAsync(() -> {
+        return computerService.submit(() -> {
             final long startTime = System.currentTimeMillis();
             // apache and hadoop configurations that are used throughout the graph computer computation
             final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.sparkConfiguration);
@@ -219,7 +232,6 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 if (!inputFromSpark || partitioned || filtered)
                     loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
 
-
                 ////////////////////////////////
                 // process the vertex program //
                 ////////////////////////////////
@@ -235,6 +247,10 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                     ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
                     // execute the vertex program
                     while (true) {
+                        if (Thread.interrupted()) {
+                            sparkContext.cancelAllJobs();
+                            throw new TraversalInterruptedException();
+                        }
                         memory.setInExecute(true);
                         viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
                         memory.setInExecute(false);
@@ -312,7 +328,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 if (!apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
                     Spark.close();
             }
-        }, exec);
+        });
     }
 
     /////////////////

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/tinkergraph-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/pom.xml b/tinkergraph-gremlin/pom.xml
index dd48df6..0f3cecd 100644
--- a/tinkergraph-gremlin/pom.xml
+++ b/tinkergraph-gremlin/pom.xml
@@ -31,6 +31,10 @@ limitations under the License.
             <artifactId>gremlin-core</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
         <!-- provided scope for gremlin-groovy because it is only used for purpose of scriptengine plugin in
              the console and server - in which case that jar should already be present -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 5fb477c..2628ff1 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.tinkergraph.process.computer;
 
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
@@ -27,6 +28,7 @@ import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
@@ -41,8 +43,10 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -62,6 +66,14 @@ public final class TinkerGraphComputer implements GraphComputer {
     private int workers = Runtime.getRuntime().availableProcessors();
     private final GraphFilter graphFilter = new GraphFilter();
 
+    private final ThreadFactory threadFactoryBoss = new BasicThreadFactory.Builder().namingPattern(TinkerGraphComputer.class.getSimpleName() + "-boss").build();
+
+    /**
+     * An {@code ExecutorService} that schedules up background work. Since a {@link GraphComputer} is only used once
+     * for a {@link VertexProgram} a single threaded executor is sufficient.
+     */
+    private final ExecutorService computerService = Executors.newSingleThreadExecutor(threadFactoryBoss);
+
     public TinkerGraphComputer(final TinkerGraph graph) {
         this.graph = graph;
     }
@@ -134,15 +146,17 @@ public final class TinkerGraphComputer implements GraphComputer {
 
         // initialize the memory
         this.memory = new TinkerMemory(this.vertexProgram, this.mapReducers);
-        return CompletableFuture.<ComputerResult>supplyAsync(() -> {
+        return computerService.submit(() -> {
             final long time = System.currentTimeMillis();
             final TinkerGraphComputerView view;
-            try (final TinkerWorkerPool workers = new TinkerWorkerPool(this.workers)) {
+            final TinkerWorkerPool workers = new TinkerWorkerPool(this.workers);
+            try {
                 if (null != this.vertexProgram) {
                     view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, this.vertexProgram.getVertexComputeKeys());
                     // execute the vertex program
                     this.vertexProgram.setup(this.memory);
                     while (true) {
+                        if (Thread.interrupted()) throw new TraversalInterruptedException();
                         this.memory.completeSubRound();
                         workers.setVertexProgram(this.vertexProgram);
                         final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
@@ -150,6 +164,7 @@ public final class TinkerGraphComputer implements GraphComputer {
                             vertexProgram.workerIterationStart(this.memory.asImmutable());
                             while (true) {
                                 final Vertex vertex = vertices.next();
+                                if (Thread.interrupted()) throw new TraversalInterruptedException();
                                 if (null == vertex) break;
                                 vertexProgram.execute(
                                         ComputerGraph.vertexProgram(vertex, vertexProgram),
@@ -182,6 +197,7 @@ public final class TinkerGraphComputer implements GraphComputer {
                     workers.executeMapReduce(workerMapReduce -> {
                         workerMapReduce.workerStart(MapReduce.Stage.MAP);
                         while (true) {
+                            if (Thread.interrupted()) throw new TraversalInterruptedException();
                             final Vertex vertex = vertices.next();
                             if (null == vertex) break;
                             workerMapReduce.map(ComputerGraph.mapReduce(vertex), mapEmitter);
@@ -198,6 +214,7 @@ public final class TinkerGraphComputer implements GraphComputer {
                         workers.executeMapReduce(workerMapReduce -> {
                             workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
                             while (true) {
+                                if (Thread.interrupted()) throw new TraversalInterruptedException();
                                 final Map.Entry<?, Queue<?>> entry = keyValues.next();
                                 if (null == entry) break;
                                 workerMapReduce.reduce(entry.getKey(), entry.getValue().iterator(), reduceEmitter);
@@ -217,9 +234,14 @@ public final class TinkerGraphComputer implements GraphComputer {
                 final Graph resultGraph = view.processResultGraphPersist(this.resultGraph, this.persist);
                 TinkerHelper.dropGraphComputerView(this.graph); // drop the view from the original source graph
                 return new DefaultComputerResult(resultGraph, this.memory.asImmutable());
-
+            } catch (InterruptedException ie) {
+                workers.closeNow();
+                throw new TraversalInterruptedException();
             } catch (Exception ex) {
+                workers.closeNow();
                 throw new RuntimeException(ex);
+            } finally {
+                workers.close();
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index e9341b4..3d851bf 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -59,7 +59,7 @@ public final class TinkerWorkerPool implements AutoCloseable {
         this.mapReducePool = new MapReducePool(mapReduce, this.numberOfWorkers);
     }
 
-    public void executeVertexProgram(final Consumer<VertexProgram> worker) {
+    public void executeVertexProgram(final Consumer<VertexProgram> worker) throws InterruptedException {
         for (int i = 0; i < this.numberOfWorkers; i++) {
             this.completionService.submit(() -> {
                 final VertexProgram vp = this.vertexProgramPool.take();
@@ -71,13 +71,15 @@ public final class TinkerWorkerPool implements AutoCloseable {
         for (int i = 0; i < this.numberOfWorkers; i++) {
             try {
                 this.completionService.take().get();
+            } catch (InterruptedException ie) {
+                throw ie;
             } catch (final Exception e) {
                 throw new IllegalStateException(e.getMessage(), e);
             }
         }
     }
 
-    public void executeMapReduce(final Consumer<MapReduce> worker) {
+    public void executeMapReduce(final Consumer<MapReduce> worker) throws InterruptedException {
         for (int i = 0; i < this.numberOfWorkers; i++) {
             this.completionService.submit(() -> {
                 final MapReduce mr = this.mapReducePool.take();
@@ -89,12 +91,18 @@ public final class TinkerWorkerPool implements AutoCloseable {
         for (int i = 0; i < this.numberOfWorkers; i++) {
             try {
                 this.completionService.take().get();
+            } catch (InterruptedException ie) {
+                throw ie;
             } catch (final Exception e) {
                 throw new IllegalStateException(e.getMessage(), e);
             }
         }
     }
 
+    public void closeNow() throws Exception {
+        this.workerPool.shutdownNow();
+    }
+
     @Override
     public void close() throws Exception {
         this.workerPool.shutdown();