You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/02/13 17:06:24 UTC

incubator-tinkerpop git commit: Make traversals interruptable.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master e1da1e148 -> f5b211f02


Make traversals interruptable.

Traversals now check if the current thread running it has been interrupted thus allowing traversals to be "cancelled". Added many tests around this in the GremlinExecutor and script engine.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/f5b211f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/f5b211f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/f5b211f0

Branch: refs/heads/master
Commit: f5b211f029ba1997dd2c751bf9a8aa859b39664c
Parents: e1da1e1
Author: Stephen Mallette <sp...@apache.org>
Authored: Fri Feb 13 11:04:28 2015 -0500
Committer: Stephen Mallette <sp...@apache.org>
Committed: Fri Feb 13 11:04:28 2015 -0500

----------------------------------------------------------------------
 .../tinkerpop/gremlin/process/Traversal.java    | 17 ++++
 .../process/TraversalInterruptedException.java  | 22 +++++
 .../util/InterruptedRuntimeException.java       | 23 +++++
 .../groovy/engine/GremlinExecutorTest.java      | 77 +++++++++++++--
 .../jsr223/GremlinGroovyScriptEngineTest.java   | 31 ++++++
 .../gremlin/groovy/engine/GremlinExecutor.java  | 94 ++++++++++---------
 .../process/traversal/CoreTraversalTest.java    | 99 ++++++++++++++++++++
 7 files changed, 313 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f5b211f0/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traversal.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traversal.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traversal.java
index 869c46b..d50efe0 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traversal.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traversal.java
@@ -31,6 +31,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 import org.apache.tinkerpop.gremlin.process.traverser.TraverserRequirement;
 import org.apache.tinkerpop.gremlin.process.util.BulkSet;
 import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.util.InterruptedRuntimeException;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -111,6 +112,7 @@ public interface Traversal<S, E> extends Iterator<E>, Serializable, Cloneable {
         final List<E> result = new ArrayList<>();
         int counter = 0;
         while (counter++ < amount && this.hasNext()) {
+            if (Thread.interrupted()) throw new TraversalInterruptedException(this);
             result.add(this.next());
         }
         return result;
@@ -157,6 +159,7 @@ public interface Traversal<S, E> extends Iterator<E>, Serializable, Cloneable {
             // use the end step so the results are bulked
             final Step<?, E> endStep = this.asAdmin().getEndStep();
             while (true) {
+                if (Thread.interrupted()) throw new TraversalInterruptedException(this);
                 final Traverser<E> traverser = endStep.next();
                 TraversalHelper.addToCollection(collection, traverser.get(), traverser.bulk());
             }
@@ -179,6 +182,7 @@ public interface Traversal<S, E> extends Iterator<E>, Serializable, Cloneable {
             // use the end step so the results are bulked
             final Step<?, E> endStep = this.asAdmin().getEndStep();
             while (true) {
+                if (Thread.interrupted()) throw new TraversalInterruptedException(this);
                 endStep.next();
             }
         } catch (final NoSuchElementException ignored) {
@@ -197,6 +201,7 @@ public interface Traversal<S, E> extends Iterator<E>, Serializable, Cloneable {
     public default <E2> void forEachRemaining(final Class<E2> endType, final Consumer<E2> consumer) {
         try {
             while (true) {
+                if (Thread.interrupted()) throw new TraversalInterruptedException(this);
                 consumer.accept((E2) next());
             }
         } catch (final NoSuchElementException ignore) {
@@ -204,6 +209,18 @@ public interface Traversal<S, E> extends Iterator<E>, Serializable, Cloneable {
         }
     }
 
+    @Override
+    public default void forEachRemaining(final Consumer<? super E> action) {
+        try {
+            while (true) {
+                if (Thread.interrupted()) throw new TraversalInterruptedException(this);
+                action.accept(next());
+            }
+        } catch (final NoSuchElementException ignore) {
+
+        }
+    }
+
     /**
      * A collection of {@link Exception} types associated with Traversal execution.
      */

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f5b211f0/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalInterruptedException.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalInterruptedException.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalInterruptedException.java
new file mode 100644
index 0000000..b6304fd
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalInterruptedException.java
@@ -0,0 +1,22 @@
+package org.apache.tinkerpop.gremlin.process;
+
+import org.apache.tinkerpop.gremlin.util.InterruptedRuntimeException;
+
+/**
+ * An unchecked exception thrown when the current thread processing a {@link Traversal} is interrupted.
+ *
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public class TraversalInterruptedException extends InterruptedRuntimeException {
+    private final Traversal interruptedTraversal;
+
+    public TraversalInterruptedException(final Traversal interruptedTraversal) {
+        super(String.format("The %s thread received interruption notification while iterating %s - it did not complete",
+                Thread.currentThread().getName(), interruptedTraversal));
+        this.interruptedTraversal = interruptedTraversal;
+    }
+
+    public Traversal getInterruptedTraversal() {
+        return interruptedTraversal;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f5b211f0/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/InterruptedRuntimeException.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/InterruptedRuntimeException.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/InterruptedRuntimeException.java
new file mode 100644
index 0000000..e21db3e
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/InterruptedRuntimeException.java
@@ -0,0 +1,23 @@
+package org.apache.tinkerpop.gremlin.util;
+
+/**
+ * An unchecked version of the {@link java.lang.InterruptedException}.
+ *
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public class InterruptedRuntimeException extends RuntimeException {
+    public InterruptedRuntimeException() {
+    }
+
+    public InterruptedRuntimeException(final String message) {
+        super(message);
+    }
+
+    public InterruptedRuntimeException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+
+    public InterruptedRuntimeException(final Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f5b211f0/gremlin-groovy-test/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java
----------------------------------------------------------------------
diff --git a/gremlin-groovy-test/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java b/gremlin-groovy-test/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java
index a270fe7..66790f7 100644
--- a/gremlin-groovy-test/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java
+++ b/gremlin-groovy-test/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutorTest.java
@@ -131,7 +131,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
     }
 
     @Test
-    public void shouldTimeoutScript() throws Exception {
+    public void shouldTimeoutSleepingScript() throws Exception {
         final AtomicBoolean successCalled = new AtomicBoolean(false);
         final AtomicBoolean failureCalled = new AtomicBoolean(false);
 
@@ -149,7 +149,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
             assertEquals(TimeoutException.class, ex.getCause().getClass());
         }
 
-        timeOutCount.await(2000, TimeUnit.MILLISECONDS);
+        assertTrue(timeOutCount.await(2000, TimeUnit.MILLISECONDS));
 
         assertFalse(successCalled.get());
         assertFalse(failureCalled.get());
@@ -158,6 +158,71 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
     }
 
     @Test
+    @LoadGraphWith(LoadGraphWith.GraphData.GRATEFUL)
+    public void shouldTimeoutIteratingTraversalScript() throws Exception {
+        final AtomicBoolean successCalled = new AtomicBoolean(false);
+        final AtomicBoolean failureCalled = new AtomicBoolean(false);
+
+        final CountDownLatch timeOutCount = new CountDownLatch(1);
+
+        final GremlinExecutor gremlinExecutor = GremlinExecutor.build()
+                .scriptEvaluationTimeout(1000)
+                .afterFailure((b, e) -> failureCalled.set(true))
+                .afterSuccess((b) -> successCalled.set(true))
+                .afterTimeout((b) -> timeOutCount.countDown()).create();
+        try {
+            final Bindings b = new SimpleBindings();
+            b.put("g", g);
+            gremlinExecutor.eval("g.V().out().out().out().out().out().out().out().out().out().out().out().iterate()", b).get();
+            fail("This script should have timed out with an exception");
+        } catch (Exception ex) {
+            assertEquals(TimeoutException.class, ex.getCause().getClass());
+        }
+
+        assertTrue(timeOutCount.await(2000, TimeUnit.MILLISECONDS));
+
+        assertFalse(successCalled.get());
+        assertFalse(failureCalled.get());
+        assertEquals(0, timeOutCount.getCount());
+        gremlinExecutor.close();
+    }
+
+    @Test
+    @LoadGraphWith(LoadGraphWith.GraphData.GRATEFUL)
+    public void shouldTimeoutIteratingTraversalScriptButBeSureInterruptedThreadCanBeReused() throws Exception {
+        final CountDownLatch timeOutCount = new CountDownLatch(1);
+
+        final ExecutorService evalExecutor = Executors.newSingleThreadExecutor(testingThreadFactory);
+        final GremlinExecutor gremlinExecutor = GremlinExecutor.build()
+                .executorService(evalExecutor)
+                .scriptEvaluationTimeout(1000)
+                .afterTimeout((b) -> timeOutCount.countDown()).create();
+
+        assertEquals(2, gremlinExecutor.eval("1+1").get());
+
+        try {
+            final Bindings b = new SimpleBindings();
+            b.put("g", g);
+            gremlinExecutor.eval("g.V().out().out().out().out().out().out().out().out().out().out().out().iterate()", b).get();
+            fail("This script should have timed out with an exception");
+        } catch (Exception ex) {
+            assertEquals(TimeoutException.class, ex.getCause().getClass());
+
+            // just make sure that interrupted thread is good to go again
+            assertEquals(2, gremlinExecutor.eval("1+1").get());
+            assertEquals(2, gremlinExecutor.eval("1+1").get());
+            assertEquals(2, gremlinExecutor.eval("1+1").get());
+        }
+
+        assertTrue(timeOutCount.await(2000, TimeUnit.MILLISECONDS));
+
+        assertEquals(0, timeOutCount.getCount());
+        gremlinExecutor.close();
+        evalExecutor.shutdown();
+        evalExecutor.awaitTermination(30000, TimeUnit.MILLISECONDS);
+    }
+
+    @Test
     public void shouldCallFail() throws Exception {
         final AtomicBoolean timeoutCalled = new AtomicBoolean(false);
         final AtomicBoolean successCalled = new AtomicBoolean(false);
@@ -209,7 +274,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
         // then iterated in another.  note that Gremlin Server configures the script engine to auto-commit
         // after evaluation.  this basically tests the state of the Gremlin Server GremlinExecutor when
         // being used in sessionless mode
-        final ExecutorService evalExecutor = Executors.newFixedThreadPool(2, testingThreadFactory);
+        final ExecutorService evalExecutor = Executors.newSingleThreadExecutor(testingThreadFactory);
         final GremlinExecutor gremlinExecutor = GremlinExecutor.build()
                 .afterSuccess(b -> {
                     final Graph graph = (Graph) b.get("g");
@@ -244,7 +309,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
         // this test sort of simulates Gremlin Server interaction where a Traversal is eval'd in one Thread, but
         // then iterated in another.  this basically tests the state of the Gremlin Server GremlinExecutor when
         // being used in session mode
-        final ExecutorService evalExecutor = Executors.newFixedThreadPool(2, testingThreadFactory);
+        final ExecutorService evalExecutor = Executors.newSingleThreadExecutor(testingThreadFactory);
         final GremlinExecutor gremlinExecutor = GremlinExecutor.build().executorService(evalExecutor).create();
 
         final Map<String,Object> bindings = new HashMap<>();
@@ -406,7 +471,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
             gremlinExecutor.eval("c = new java.awt.Color(255, 255, 255)").get();
             fail("Should have failed security");
         } catch (Exception se) {
-            assertEquals(SecurityException.class, se.getCause().getCause().getCause().getCause().getClass());
+            assertEquals(SecurityException.class, se.getCause().getClass());
         } finally {
             gremlinExecutor.close();
         }
@@ -428,7 +493,7 @@ public class GremlinExecutorTest extends AbstractGremlinTest {
             gremlinExecutor.eval("c = 'new java.awt.Color(255, 255, 255)'").get();
             fail("Should have failed security");
         } catch (Exception se) {
-            assertEquals(SecurityException.class, se.getCause().getCause().getCause().getCause().getClass());
+            assertEquals(SecurityException.class, se.getCause().getClass());
         }
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f5b211f0/gremlin-groovy-test/src/main/java/org/apache/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java
----------------------------------------------------------------------
diff --git a/gremlin-groovy-test/src/main/java/org/apache/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java b/gremlin-groovy-test/src/main/java/org/apache/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java
index 5412163..6c4ef08 100644
--- a/gremlin-groovy-test/src/main/java/org/apache/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java
+++ b/gremlin-groovy-test/src/main/java/org/apache/tinkerpop/gremlin/groovy/jsr223/GremlinGroovyScriptEngineTest.java
@@ -26,6 +26,7 @@ import org.apache.tinkerpop.gremlin.groovy.NoImportCustomizerProvider;
 import org.apache.tinkerpop.gremlin.groovy.SecurityCustomizerProvider;
 import org.apache.tinkerpop.gremlin.process.T;
 import org.apache.tinkerpop.gremlin.process.Traversal;
+import org.apache.tinkerpop.gremlin.process.TraversalInterruptedException;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.util.StreamFactory;
@@ -54,6 +55,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.GRATEFUL;
 import static org.junit.Assert.*;
 
 /**
@@ -578,6 +580,35 @@ public class GremlinGroovyScriptEngineTest extends AbstractGremlinTest {
         }
     }
 
+    @Test
+    @LoadGraphWith(GRATEFUL)
+    public void shouldTimeoutOnEvalOfTraversalWhereIterateHasStarted() throws Exception {
+        // this is just a safety net test for groovy - see the gremlin-test/CoreTraversalTest for more
+        // complete testing of breaking out of iteration
+        final ScriptEngine engine = new GremlinGroovyScriptEngine();
+        final Bindings b = new SimpleBindings();
+        b.put("g", g);
+
+        final AtomicBoolean interrupted = new AtomicBoolean(false);
+
+        final Thread t = new Thread(() -> {
+            try {
+                engine.eval("g.V().out().out().out().out().out().out().out().out().out().out().out().iterate()", b);
+                fail("No way this should have completed in any reasonable time");
+            } catch (Exception ex) {
+                interrupted.set(ex.getCause().getCause().getClass().equals(TraversalInterruptedException.class));
+            }
+        });
+
+        t.start();
+
+        Thread.sleep(1000);
+        t.interrupt();
+        t.join();
+
+        assertTrue(interrupted.get());
+    }
+
     public static class DenyAll extends GroovyValueFilter {
         @Override
         public Object filter(final Object o) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f5b211f0/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java b/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
index 5bb21cd..20dfab9 100644
--- a/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
+++ b/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
@@ -18,8 +18,10 @@
  */
 package org.apache.tinkerpop.gremlin.groovy.engine;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine;
 import org.apache.tinkerpop.gremlin.groovy.plugin.GremlinPlugin;
+import org.apache.tinkerpop.gremlin.process.TraversalInterruptedException;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.javatuples.Pair;
@@ -44,6 +46,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -85,7 +88,7 @@ public class GremlinExecutor implements AutoCloseable {
     private final Consumer<Bindings> beforeEval;
     private final Consumer<Bindings> afterSuccess;
     private final Consumer<Bindings> afterTimeout;
-    private final BiConsumer<Bindings, Exception> afterFailure;
+    private final BiConsumer<Bindings, Throwable> afterFailure;
     private final Set<String> enabledPlugins;
     private final boolean suppliedExecutor;
     private final boolean suppliedScheduledExecutor;
@@ -94,7 +97,7 @@ public class GremlinExecutor implements AutoCloseable {
                             final long scriptEvaluationTimeout, final Bindings globalBindings,
                             final ExecutorService executorService, final ScheduledExecutorService scheduledExecutorService,
                             final Consumer<Bindings> beforeEval, final Consumer<Bindings> afterSuccess,
-                            final Consumer<Bindings> afterTimeout, final BiConsumer<Bindings, Exception> afterFailure,
+                            final Consumer<Bindings> afterTimeout, final BiConsumer<Bindings, Throwable> afterFailure,
                             final Set<String> enabledPlugins, final boolean suppliedExecutor, final boolean suppliedScheduledExecutor) {
         this.executorService = executorService;
         this.scheduledExecutorService = scheduledExecutorService;
@@ -143,36 +146,61 @@ public class GremlinExecutor implements AutoCloseable {
 
         logger.debug("Preparing to evaluate script - {} - in thread [{}]", script, Thread.currentThread().getName());
 
-        // select the gremlin threadpool to execute the script evaluation in
-        final AtomicBoolean abort = new AtomicBoolean(false);
-        final CompletableFuture<Object> evaluationFuture = CompletableFuture.supplyAsync(() -> {
-
-            final Bindings bindings = new SimpleBindings();
-            bindings.putAll(this.globalBindings);
-            bindings.putAll(boundVars);
+        final Bindings bindings = new SimpleBindings();
+        bindings.putAll(this.globalBindings);
+        bindings.putAll(boundVars);
+        beforeEval.accept(bindings);
 
+        final CompletableFuture<Object> evaluationFuture = new CompletableFuture<>();
+        final FutureTask<Void> f = new FutureTask<>(() -> {
             try {
                 logger.debug("Evaluating script - {} - in thread [{}]", script, Thread.currentThread().getName());
 
-                beforeEval.accept(bindings);
                 final Object o = scriptEngines.eval(script, bindings, lang);
 
-                if (abort.get())
-                    afterTimeout.accept(bindings);
-                else
-                    afterSuccess.accept(bindings);
+                afterSuccess.accept(bindings);
 
                 // apply a transformation before sending back the result - useful when trying to force serialization
-                // in the same thread that the eval took place given threadlocal nature of graphs as well as some
+                // in the same thread that the eval took place given ThreadLocal nature of graphs as well as some
                 // transactional constraints
-                return null == transformResult ? o : transformResult.apply(o);
+                evaluationFuture.complete(null == transformResult ? o : transformResult.apply(o));
             } catch (Exception ex) {
-                afterFailure.accept(bindings, ex);
-                throw new RuntimeException(ex);
+                final Throwable root = ExceptionUtils.getRootCause(ex);
+
+                // thread interruptions will typically come as the result of a timeout, so in those cases,
+                // check for that situation and convert to TimeoutException
+                if (root.getClass().equals(InterruptedException.class)
+                        || root.getClass().equals(TraversalInterruptedException.class))
+                    evaluationFuture.completeExceptionally(new TimeoutException(
+                            String.format("Script evaluation exceeded the configured threshold of %s ms for request [%s]: %s", scriptEvaluationTimeout, script, root.getMessage())));
+                else {
+                    afterFailure.accept(bindings, root);
+                    evaluationFuture.completeExceptionally(root);
+                }
             }
-        }, executorService);
 
-        scheduleTimeout(evaluationFuture, script, abort);
+            return null;
+        });
+
+        executorService.execute(f);
+
+        if (scriptEvaluationTimeout > 0) {
+            // Schedule a timeout in the thread pool for future execution
+            final ScheduledFuture<?> sf = scheduledExecutorService.schedule(() -> {
+                logger.info("Timing out script - {} - in thread [{}]", script, Thread.currentThread().getName());
+                if (!f.isDone()) {
+                    afterTimeout.accept(bindings);
+                    f.cancel(true);
+                }
+            }, scriptEvaluationTimeout, TimeUnit.MILLISECONDS);
+
+            // Cancel the scheduled timeout if the eval future is complete or the script evaluation failed
+            // with exception
+            evaluationFuture.handleAsync((v, t) -> {
+                logger.debug("Killing scheduled timeout on script evaluation as the eval completed (possibly with exception).");
+                return sf.cancel(true);
+            });
+        }
 
         return evaluationFuture;
     }
@@ -246,28 +274,6 @@ public class GremlinExecutor implements AutoCloseable {
         return future;
     }
 
-    private void scheduleTimeout(final CompletableFuture<Object> evaluationFuture, final String script, final AtomicBoolean abort) {
-        if (scriptEvaluationTimeout > 0) {
-            // Schedule a timeout in the threadpool for future execution - killing an eval is cheap
-            final ScheduledFuture<?> sf = scheduledExecutorService.schedule(() -> {
-                logger.info("Timing out script - {} - in thread [{}]", script, Thread.currentThread().getName());
-
-                if (!evaluationFuture.isDone()) {
-                    abort.set(true);
-                    evaluationFuture.completeExceptionally(new TimeoutException(
-                            String.format("Script evaluation exceeded the configured threshold of %s ms for request [%s]", scriptEvaluationTimeout, script)));
-                }
-            }, scriptEvaluationTimeout, TimeUnit.MILLISECONDS);
-
-            // Cancel the scheduled timeout if the eval future is complete or the script evaluation failed
-            // with exception
-            evaluationFuture.handleAsync((v, t) -> {
-                logger.debug("Killing scheduled timeout on script evaluation as the eval completed (possibly with exception).");
-                return sf.cancel(true);
-            });
-        }
-    }
-
     private ScriptEngines createScriptEngines() {
         // plugins already on the path - ones static to the classpath
         final List<GremlinPlugin> globalPlugins = new ArrayList<>();
@@ -372,7 +378,7 @@ public class GremlinExecutor implements AutoCloseable {
         };
         private Consumer<Bindings> afterTimeout = (b) -> {
         };
-        private BiConsumer<Bindings, Exception> afterFailure = (b, e) -> {
+        private BiConsumer<Bindings, Throwable> afterFailure = (b, e) -> {
         };
         private List<List<String>> use = new ArrayList<>();
         private Bindings globalBindings = new SimpleBindings();
@@ -470,7 +476,7 @@ public class GremlinExecutor implements AutoCloseable {
         /**
          * A {@link Consumer} to execute in the event of failure.
          */
-        public Builder afterFailure(final BiConsumer<Bindings, Exception> afterFailure) {
+        public Builder afterFailure(final BiConsumer<Bindings, Throwable> afterFailure) {
             this.afterFailure = afterFailure;
             return this;
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f5b211f0/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
index 5885544..4aacd68 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.FeatureRequirement;
 import org.apache.tinkerpop.gremlin.LoadGraphWith;
 import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
 import org.apache.tinkerpop.gremlin.process.Traversal;
+import org.apache.tinkerpop.gremlin.process.TraversalInterruptedException;
 import org.apache.tinkerpop.gremlin.process.graph.traversal.GraphTraversal;
 import org.apache.tinkerpop.gremlin.process.util.BulkSet;
 import org.apache.tinkerpop.gremlin.structure.Contains;
@@ -32,12 +33,18 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.junit.Test;
 
+import javax.script.Bindings;
+import javax.script.ScriptEngine;
+import javax.script.SimpleBindings;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.GRATEFUL;
 import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
 import static org.apache.tinkerpop.gremlin.process.graph.traversal.__.*;
 import static org.apache.tinkerpop.gremlin.structure.Graph.Features.GraphFeatures.FEATURE_TRANSACTIONS;
@@ -201,4 +208,96 @@ public class CoreTraversalTest extends AbstractGremlinProcessTest {
         assertEquals(1, IteratorUtils.count(t));
         g.tx().rollback();
     }
+
+    @Test
+    @LoadGraphWith(GRATEFUL)
+    public void shouldTimeoutOnTraversalWhereIterateHasStarted() throws Exception {
+        final AtomicBoolean interrupted = new AtomicBoolean(false);
+
+        final Thread t = new Thread(() -> {
+            try {
+                g.V().out().out().out().out().out().out().out().out().out().out().out().iterate();
+                fail("No way this should have completed in any reasonable time");
+            } catch (Exception ex) {
+                interrupted.set(ex.getClass().equals(TraversalInterruptedException.class));
+            }
+        });
+
+        t.start();
+
+        Thread.sleep(1000);
+        t.interrupt();
+        t.join();
+
+        assertTrue(interrupted.get());
+    }
+
+    @Test
+    @LoadGraphWith(GRATEFUL)
+    public void shouldTimeoutOnTraversalWhereForEachRemainingHasStarted() throws Exception {
+        final AtomicBoolean interrupted = new AtomicBoolean(false);
+
+        final Thread t = new Thread(() -> {
+            try {
+                g.V().out().out().out().out().out().out().out().out().out().out().out().forEachRemaining(Object::toString);
+                fail("No way this should have completed in any reasonable time");
+            } catch (Exception ex) {
+                interrupted.set(ex.getClass().equals(TraversalInterruptedException.class));
+            }
+        });
+
+        t.start();
+
+        Thread.sleep(1000);
+        t.interrupt();
+        t.join();
+
+        assertTrue(interrupted.get());
+    }
+
+    @Test
+    @LoadGraphWith(GRATEFUL)
+    public void shouldTimeoutOnTraversalWhereNextingStarted() throws Exception {
+        final AtomicBoolean interrupted = new AtomicBoolean(false);
+
+        final Thread t = new Thread(() -> {
+            try {
+                g.V().out().out().out().out().out().out().out().out().out().out().out().next(Integer.MAX_VALUE);
+                fail("No way this should have completed in any reasonable time");
+            } catch (Exception ex) {
+                interrupted.set(ex.getClass().equals(TraversalInterruptedException.class));
+            }
+        });
+
+        t.start();
+
+        Thread.sleep(1000);
+        t.interrupt();
+        t.join();
+
+        assertTrue(interrupted.get());
+    }
+
+    @Test
+    @LoadGraphWith(GRATEFUL)
+    public void shouldTimeoutOnTraversalWhereFillStarted() throws Exception {
+        final AtomicBoolean interrupted = new AtomicBoolean(false);
+
+        final Thread t = new Thread(() -> {
+            try {
+                g.V().out().out().out().out().out().out().out().out().out().out().out().fill(new ArrayList<>());
+                fail("No way this should have completed in any reasonable time");
+            } catch (Exception ex) {
+                interrupted.set(ex.getClass().equals(TraversalInterruptedException.class));
+            }
+        });
+
+        t.start();
+
+        Thread.sleep(1000);
+        t.interrupt();
+        t.join();
+
+        assertTrue(interrupted.get());
+    }
 }