You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2016/05/04 15:03:54 UTC
[01/10] incubator-tinkerpop git commit: Removed another redundant
interruption check.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 3e7d19d65 -> 8fd0bae0f
Removed another redundant interruption check.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/24704bc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/24704bc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/24704bc5
Branch: refs/heads/master
Commit: 24704bc5b97f78b9f40d19c90e0e4848c54a2e8a
Parents: 400e276
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Wed May 4 06:39:12 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed May 4 06:43:35 2016 -0400
----------------------------------------------------------------------
.../process/computer/traversal/step/map/ComputerResultStep.java | 2 --
1 file changed, 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/24704bc5/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ComputerResultStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ComputerResultStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ComputerResultStep.java
index c8b56a2..b5fd8e8 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ComputerResultStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ComputerResultStep.java
@@ -25,7 +25,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Property;
@@ -63,7 +62,6 @@ public final class ComputerResultStep<S> extends AbstractStep<ComputerResult, S>
@Override
protected Traverser.Admin<S> processNextStart() throws NoSuchElementException {
while (true) {
- if(Thread.interrupted()) throw new TraversalInterruptedException();
if (this.currentIterator.hasNext())
return this.currentIterator.next();
else {
[05/10] incubator-tinkerpop git commit: Added process tests for
traversal interruption
Posted by sp...@apache.org.
Added process tests for traversal interruption
Covered GraphStep, VertexStep and PropertiesStep which should yield some coverage to provider implementations.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/3a5f738c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/3a5f738c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/3a5f738c
Branch: refs/heads/master
Commit: 3a5f738c5ef30711bce227cfb1946739fda9050c
Parents: 0d4c07f
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Tue Apr 19 19:09:20 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed May 4 06:43:35 2016 -0400
----------------------------------------------------------------------
.../util/TraversalInterruptedException.java | 18 +++
.../traversal/util/DefaultTraversalTest.java | 1 -
.../process/traversal/CoreTraversalTest.java | 130 +++++++++++++++++++
3 files changed, 148 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3a5f738c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalInterruptedException.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalInterruptedException.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalInterruptedException.java
index 216a762..f3db464 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalInterruptedException.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalInterruptedException.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.tinkerpop.gremlin.process.traversal.util;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3a5f738c/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversalTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversalTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversalTest.java
index c1a4c6b..6f6a6c6 100644
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversalTest.java
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversalTest.java
@@ -69,7 +69,6 @@ public class DefaultTraversalTest {
startedIterating.countDown();
counter.incrementAndGet();
}).iterate();
- fail("Traversal should have been interrupted");
} catch (Exception ex) {
exceptionThrown.set(ex instanceof TraversalInterruptedException);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3a5f738c/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
index 930030e..200d94b 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
@@ -24,11 +24,13 @@ import org.apache.tinkerpop.gremlin.LoadGraphWith;
import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.hamcrest.CoreMatchers;
import org.junit.Test;
import java.util.HashSet;
@@ -36,8 +38,12 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.GRATEFUL;
import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.*;
import static org.apache.tinkerpop.gremlin.structure.Graph.Features.GraphFeatures.FEATURE_TRANSACTIONS;
@@ -56,6 +62,130 @@ import static org.junit.Assert.*;
public class CoreTraversalTest extends AbstractGremlinProcessTest {
@Test
+ @LoadGraphWith(GRATEFUL)
+ public void shouldRespectThreadInterruptionInGraphStep() throws Exception {
+ final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
+ final CountDownLatch startedIterating = new CountDownLatch(1);
+ final Thread t = new Thread(() -> {
+ try {
+ final Traversal traversal = g.V().sideEffect(traverser -> {
+ startedIterating.countDown();
+
+ // ensure that the whole traversal doesn't iterate out before we get a chance to interrupt
+ if (startedIterating.getCount() == 0) {
+ try {
+ Thread.sleep(3000);
+ } catch (Exception ignored) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ });
+ traversal.iterate();
+ } catch (Exception ex) {
+ exceptionThrown.set(ex instanceof TraversalInterruptedException);
+ }
+ });
+
+ t.start();
+
+ // total time for test should not exceed 5 seconds - this prevents the test from just hanging and allows
+ // it to finish with failure
+ assertThat(startedIterating.await(5000, TimeUnit.MILLISECONDS), CoreMatchers.is(true));
+
+ t.interrupt();
+ t.join();
+
+ // ensure that some but not all of the traversal was iterated and that the right exception was tossed
+ assertThat(exceptionThrown.get(), CoreMatchers.is(true));
+ }
+
+ @Test
+ @LoadGraphWith(GRATEFUL)
+ public void shouldRespectThreadInterruptionInVertexStep() throws Exception {
+ final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
+ final CountDownLatch startedIterating = new CountDownLatch(1);
+ final Thread t = new Thread(() -> {
+ try {
+ final AtomicBoolean first = new AtomicBoolean(true);
+ final Traversal traversal = g.V().sideEffect(traverser -> {
+ // let the first iteration flow through
+ if (!first.compareAndSet(true, false)) {
+ // ensure that the whole traversal doesn't iterate out before we get a chance to interrupt
+ // the next iteration should stop so we can force the interrupt to be handled by VertexStep
+ try {
+ Thread.sleep(3000);
+ } catch (Exception ignored) {
+ // make sure that the interrupt propagates in case the interrupt occurs during sleep.
+ // this should ensure VertexStep gets to try to throw the TraversalInterruptedException
+ Thread.currentThread().interrupt();
+ }
+ }
+ }).out().sideEffect(traverser -> {
+ startedIterating.countDown();
+ });
+ traversal.iterate();
+ } catch (Exception ex) {
+ exceptionThrown.set(ex instanceof TraversalInterruptedException);
+ }
+ });
+
+ t.start();
+
+ // total time for test should not exceed 5 seconds - this prevents the test from just hanging and allows
+ // it to finish with failure
+ assertThat(startedIterating.await(5000, TimeUnit.MILLISECONDS), CoreMatchers.is(true));
+
+ t.interrupt();
+ t.join();
+
+ // ensure that some but not all of the traversal was iterated and that the right exception was tossed
+ assertThat(exceptionThrown.get(), CoreMatchers.is(true));
+ }
+
+ @Test
+ @LoadGraphWith(GRATEFUL)
+ public void shouldRespectThreadInterruptionInPropertyStep() throws Exception {
+ final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
+ final CountDownLatch startedIterating = new CountDownLatch(1);
+ final Thread t = new Thread(() -> {
+ try {
+ final AtomicBoolean first = new AtomicBoolean(true);
+ final Traversal traversal = g.V().sideEffect(traverser -> {
+ // let the first iteration flow through
+ if (!first.compareAndSet(true, false)) {
+ // ensure that the whole traversal doesn't iterate out before we get a chance to interrupt
+ // the next iteration should stop so we can force the interrupt to be handled by PropertyStep
+ try {
+ Thread.sleep(3000);
+ } catch (Exception ignored) {
+ // make sure that the interrupt propagates in case the interrupt occurs during sleep.
+ // this should ensure PropertyStep gets to try to throw the TraversalInterruptedException
+ Thread.currentThread().interrupt();
+ }
+ }
+ }).properties().sideEffect(traverser -> {
+ startedIterating.countDown();
+ });
+ traversal.iterate();
+ } catch (Exception ex) {
+ exceptionThrown.set(ex instanceof TraversalInterruptedException);
+ }
+ });
+
+ t.start();
+
+ // total time for test should not exceed 5 seconds - this prevents the test from just hanging and allows
+ // it to finish with failure
+ assertThat(startedIterating.await(5000, TimeUnit.MILLISECONDS), CoreMatchers.is(true));
+
+ t.interrupt();
+ t.join();
+
+ // ensure that some but not all of the traversal was iterated and that the right exception was tossed
+ assertThat(exceptionThrown.get(), CoreMatchers.is(true));
+ }
+
+ @Test
@LoadGraphWith
public void shouldNeverPropagateANoBulkTraverser() {
assertFalse(g.V().dedup().sideEffect(t -> t.asAdmin().setBulk(0)).hasNext());
[09/10] incubator-tinkerpop git commit: Updated changelog.
Posted by sp...@apache.org.
Updated changelog.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/a447a249
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/a447a249
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/a447a249
Branch: refs/heads/master
Commit: a447a2495a2d20acf2620823fa65a86ea866751e
Parents: 24704bc
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Wed May 4 06:45:39 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed May 4 06:45:39 2016 -0400
----------------------------------------------------------------------
CHANGELOG.asciidoc | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a447a249/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 338823d..47ca781 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -33,6 +33,7 @@ TinkerPop 3.2.1 (NOT OFFICIALLY RELEASED YET)
* Added "help" support to Gremlin Console with the `-h` flag.
* Added options to better control verbosity of Gremlin Console output with `-Q`, `-V` and `-D`.
* Deprecated the `ScriptExecutor` - the `-e` option to `gremlin.sh` is now handled by `Console`.
+* `Traversal` now allows cancellation with `Thread.interrupt()`.
* Added a Gremlin language variant tutorial teaching people how to embed Gremlin in a host programming language.
[[release-3.2.0-incubating]]
[03/10] incubator-tinkerpop git commit: OLAP Traversals now support
traversal interruption.
Posted by sp...@apache.org.
OLAP Traversals now support traversal interruption.
Implemented for SparkGraphComputer and TinkerGraphComputer.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/173034eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/173034eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/173034eb
Branch: refs/heads/master
Commit: 173034eb51da16da9d960dc48b92f792214ab04c
Parents: 8b4e670
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri Apr 22 15:26:35 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed May 4 06:43:35 2016 -0400
----------------------------------------------------------------------
.../traversal/step/map/VertexProgramStep.java | 18 +++-
gremlin-driver/pom.xml | 1 -
.../gremlin/process/ProcessComputerSuite.java | 4 +
.../TraversalInterruptionComputerTest.java | 97 ++++++++++++++++++++
.../traversal/TraversalInterruptionTest.java | 2 +-
.../gremlin/hadoop/structure/HadoopGraph.java | 5 +
pom.xml | 5 +
.../process/computer/SparkGraphComputer.java | 24 ++++-
tinkergraph-gremlin/pom.xml | 4 +
.../process/computer/TinkerGraphComputer.java | 30 +++++-
.../process/computer/TinkerWorkerPool.java | 12 ++-
11 files changed, 187 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
index ab095f2..8fe7ed0 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
@@ -36,9 +36,11 @@ import org.apache.tinkerpop.gremlin.structure.Graph;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Stephen Mallette (http://stephen.genoprime.com)
*/
public abstract class VertexProgramStep extends AbstractStep<ComputerResult, ComputerResult> implements VertexComputing {
@@ -52,21 +54,31 @@ public abstract class VertexProgramStep extends AbstractStep<ComputerResult, Com
@Override
protected Traverser.Admin<ComputerResult> processNextStart() throws NoSuchElementException {
+ Future<ComputerResult> future = null;
try {
if (this.first && this.getPreviousStep() instanceof EmptyStep) {
this.first = false;
final Graph graph = this.getTraversal().getGraph().get();
- final ComputerResult result = this.generateComputer(graph).program(this.generateProgram(graph)).submit().get();
+ future = this.generateComputer(graph).program(this.generateProgram(graph)).submit();
+ final ComputerResult result = future.get();
this.processMemorySideEffects(result.memory());
return this.getTraversal().getTraverserGenerator().generate(result, this, 1l);
} else {
final Traverser.Admin<ComputerResult> traverser = this.starts.next();
final Graph graph = traverser.get().graph();
- final ComputerResult result = this.generateComputer(graph).program(this.generateProgram(graph)).submit().get();
+ future = this.generateComputer(graph).program(this.generateProgram(graph)).submit();
+ final ComputerResult result = future.get();
this.processMemorySideEffects(result.memory());
return traverser.split(result, this);
}
- } catch (final InterruptedException | ExecutionException e) {
+ } catch (InterruptedException ie) {
+ // the thread running the traversal took an interruption while waiting on the call the future.get().
+ // the future should then be cancelled with interruption so that the the GraphComputer that created
+ // the future knows we don't care about it anymore. The GraphComputer should attempt to respect this
+ // cancellation request.
+ if (future != null) future.cancel(true);
+ throw new TraversalInterruptedException();
+ } catch (ExecutionException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/gremlin-driver/pom.xml
----------------------------------------------------------------------
diff --git a/gremlin-driver/pom.xml b/gremlin-driver/pom.xml
index 515f765..ecbf25d 100644
--- a/gremlin-driver/pom.xml
+++ b/gremlin-driver/pom.xml
@@ -60,7 +60,6 @@ limitations under the License.
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
- <version>3.3.1</version>
</dependency>
<!-- TEST -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessComputerSuite.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessComputerSuite.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessComputerSuite.java
index 8f85e62..4c183dc 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessComputerSuite.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessComputerSuite.java
@@ -26,6 +26,7 @@ import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVerte
import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgramTest;
import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgramTest;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalEngine;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionComputerTest;
import org.apache.tinkerpop.gremlin.process.traversal.step.branch.BranchTest;
import org.apache.tinkerpop.gremlin.process.traversal.step.branch.ChooseTest;
import org.apache.tinkerpop.gremlin.process.traversal.step.branch.OptionalTest;
@@ -182,6 +183,9 @@ public class ProcessComputerSuite extends AbstractGremlinSuite {
SubgraphTest.Traversals.class,
TreeTest.Traversals.class,
+ // compliance
+ TraversalInterruptionComputerTest.class,
+
// algorithms
PageRankVertexProgramTest.class,
PeerPressureVertexProgramTest.class,
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionComputerTest.java
new file mode 100644
index 0000000..c3a1042
--- /dev/null
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionComputerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal;
+
+import org.apache.tinkerpop.gremlin.LoadGraphWith;
+import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.UnaryOperator;
+
+import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.GRATEFUL;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+@RunWith(Parameterized.class)
+public class TraversalInterruptionComputerTest extends AbstractGremlinProcessTest {
+
+ @Parameterized.Parameters(name = "expectInterruption({0})")
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {"g_V", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V()},
+ {"g_V_out", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V().out()},
+ {"g_V_outE", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V().outE()},
+ {"g_V_in", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V().in()},
+ {"g_V_inE", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V().inE()},
+ {"g_V_properties", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V().properties()},
+ {"g_E", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.E()},
+ {"g_E_outV", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.E().outV()},
+ {"g_E_inV", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.E().inV()},
+ {"g_E_properties", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.E().properties()},
+ });
+ }
+
+ @Parameterized.Parameter(value = 0)
+ public String name;
+
+ @Parameterized.Parameter(value = 1)
+ public Function<GraphTraversalSource,GraphTraversal<?,?>> traversalMaker;
+
+ @Test
+ @LoadGraphWith(GRATEFUL)
+ public void shouldRespectThreadInterruptionInVertexStep() throws Exception {
+ final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
+ final CountDownLatch startedIterating = new CountDownLatch(1);
+ final Thread t = new Thread(() -> {
+ try {
+ final Traversal traversal = traversalMaker.apply(g).sideEffect(traverser -> {
+ startedIterating.countDown();
+ });
+ traversal.iterate();
+ } catch (Exception ex) {
+ exceptionThrown.set(ex instanceof TraversalInterruptedException);
+ }
+ }, name);
+
+ t.start();
+
+ // total time for test should not exceed 5 seconds - this prevents the test from just hanging and allows
+ // it to finish with failure
+ assertThat(startedIterating.await(5000, TimeUnit.MILLISECONDS), CoreMatchers.is(true));
+
+ t.interrupt();
+ t.join();
+
+ // ensure that some but not all of the traversal was iterated and that the right exception was tossed
+ assertThat(exceptionThrown.get(), CoreMatchers.is(true));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java
index e9d584f..c508df2 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java
@@ -97,7 +97,7 @@ public class TraversalInterruptionTest extends AbstractGremlinProcessTest {
} catch (Exception ex) {
exceptionThrown.set(ex instanceof TraversalInterruptedException);
}
- });
+ }, name);
t.start();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index 3dd0d9a..08bbeeb 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -181,6 +181,11 @@ import java.util.stream.Stream;
method = "shouldStartAndEndWorkersForVertexProgramAndMapReduce",
reason = "Spark executes map and combine in a lazy fashion and thus, fails the blocking aspect of this test",
computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer"})
+@Graph.OptOut(
+ test = "org.apache.tinkerpop.gremlin.process.computer.TraversalInterruptionComputerTest",
+ method = "*",
+ reason = "This test makes an use of a sideEffect to enforce when a thread interruption is triggered and thus isn't applicable to HadoopGraph",
+ computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer"})
public final class HadoopGraph implements Graph {
public static final Logger LOGGER = LoggerFactory.getLogger(HadoopGraph.class);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 28e932c..bd3584d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -499,6 +499,11 @@ limitations under the License.
<version>3.2.2</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.3.1</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 928a880..bc8bc50 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.spark.process.computer;
import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.configuration.FileConfiguration;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -51,6 +52,7 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
import org.apache.tinkerpop.gremlin.spark.structure.Spark;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputFormatRDD;
@@ -67,9 +69,11 @@ import org.apache.tinkerpop.gremlin.structure.io.Storage;
import java.io.File;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
import java.util.stream.Stream;
/**
@@ -77,6 +81,15 @@ import java.util.stream.Stream;
*/
public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
+
+ private final ThreadFactory threadFactoryBoss = new BasicThreadFactory.Builder().namingPattern(SparkGraphComputer.class.getSimpleName() + "-boss").build();
+
+ /**
+ * An {@code ExecutorService} that schedules up background work. Since a {@link GraphComputer} is only used once
+ * for a {@link VertexProgram} a single threaded executor is sufficient.
+ */
+ private final ExecutorService computerService = Executors.newSingleThreadExecutor(threadFactoryBoss);
+
private final org.apache.commons.configuration.Configuration sparkConfiguration;
private boolean workersSet = false;
@@ -110,7 +123,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
private Future<ComputerResult> submitWithExecutor(Executor exec) {
// create the completable future �
- return CompletableFuture.<ComputerResult>supplyAsync(() -> {
+ return computerService.submit(() -> {
final long startTime = System.currentTimeMillis();
// apache and hadoop configurations that are used throughout the graph computer computation
final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.sparkConfiguration);
@@ -219,7 +232,6 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
if (!inputFromSpark || partitioned || filtered)
loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
-
////////////////////////////////
// process the vertex program //
////////////////////////////////
@@ -235,6 +247,10 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
// execute the vertex program
while (true) {
+ if (Thread.interrupted()) {
+ sparkContext.cancelAllJobs();
+ throw new TraversalInterruptedException();
+ }
memory.setInExecute(true);
viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
memory.setInExecute(false);
@@ -312,7 +328,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
if (!apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
Spark.close();
}
- }, exec);
+ });
}
/////////////////
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/tinkergraph-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/pom.xml b/tinkergraph-gremlin/pom.xml
index dd48df6..0f3cecd 100644
--- a/tinkergraph-gremlin/pom.xml
+++ b/tinkergraph-gremlin/pom.xml
@@ -31,6 +31,10 @@ limitations under the License.
<artifactId>gremlin-core</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
<!-- provided scope for gremlin-groovy because it is only used for purpose of scriptengine plugin in
the console and server - in which case that jar should already be present -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 5fb477c..2628ff1 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.tinkergraph.process.computer;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
@@ -27,6 +28,7 @@ import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
@@ -41,8 +43,10 @@ import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -62,6 +66,14 @@ public final class TinkerGraphComputer implements GraphComputer {
private int workers = Runtime.getRuntime().availableProcessors();
private final GraphFilter graphFilter = new GraphFilter();
+ private final ThreadFactory threadFactoryBoss = new BasicThreadFactory.Builder().namingPattern(TinkerGraphComputer.class.getSimpleName() + "-boss").build();
+
+ /**
+ * An {@code ExecutorService} that schedules up background work. Since a {@link GraphComputer} is only used once
+ * for a {@link VertexProgram} a single threaded executor is sufficient.
+ */
+ private final ExecutorService computerService = Executors.newSingleThreadExecutor(threadFactoryBoss);
+
public TinkerGraphComputer(final TinkerGraph graph) {
this.graph = graph;
}
@@ -134,15 +146,17 @@ public final class TinkerGraphComputer implements GraphComputer {
// initialize the memory
this.memory = new TinkerMemory(this.vertexProgram, this.mapReducers);
- return CompletableFuture.<ComputerResult>supplyAsync(() -> {
+ return computerService.submit(() -> {
final long time = System.currentTimeMillis();
final TinkerGraphComputerView view;
- try (final TinkerWorkerPool workers = new TinkerWorkerPool(this.workers)) {
+ final TinkerWorkerPool workers = new TinkerWorkerPool(this.workers);
+ try {
if (null != this.vertexProgram) {
view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, this.vertexProgram.getVertexComputeKeys());
// execute the vertex program
this.vertexProgram.setup(this.memory);
while (true) {
+ if (Thread.interrupted()) throw new TraversalInterruptedException();
this.memory.completeSubRound();
workers.setVertexProgram(this.vertexProgram);
final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
@@ -150,6 +164,7 @@ public final class TinkerGraphComputer implements GraphComputer {
vertexProgram.workerIterationStart(this.memory.asImmutable());
while (true) {
final Vertex vertex = vertices.next();
+ if (Thread.interrupted()) throw new TraversalInterruptedException();
if (null == vertex) break;
vertexProgram.execute(
ComputerGraph.vertexProgram(vertex, vertexProgram),
@@ -182,6 +197,7 @@ public final class TinkerGraphComputer implements GraphComputer {
workers.executeMapReduce(workerMapReduce -> {
workerMapReduce.workerStart(MapReduce.Stage.MAP);
while (true) {
+ if (Thread.interrupted()) throw new TraversalInterruptedException();
final Vertex vertex = vertices.next();
if (null == vertex) break;
workerMapReduce.map(ComputerGraph.mapReduce(vertex), mapEmitter);
@@ -198,6 +214,7 @@ public final class TinkerGraphComputer implements GraphComputer {
workers.executeMapReduce(workerMapReduce -> {
workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
while (true) {
+ if (Thread.interrupted()) throw new TraversalInterruptedException();
final Map.Entry<?, Queue<?>> entry = keyValues.next();
if (null == entry) break;
workerMapReduce.reduce(entry.getKey(), entry.getValue().iterator(), reduceEmitter);
@@ -217,9 +234,14 @@ public final class TinkerGraphComputer implements GraphComputer {
final Graph resultGraph = view.processResultGraphPersist(this.resultGraph, this.persist);
TinkerHelper.dropGraphComputerView(this.graph); // drop the view from the original source graph
return new DefaultComputerResult(resultGraph, this.memory.asImmutable());
-
+ } catch (InterruptedException ie) {
+ workers.closeNow();
+ throw new TraversalInterruptedException();
} catch (Exception ex) {
+ workers.closeNow();
throw new RuntimeException(ex);
+ } finally {
+ workers.close();
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/173034eb/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index e9341b4..3d851bf 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -59,7 +59,7 @@ public final class TinkerWorkerPool implements AutoCloseable {
this.mapReducePool = new MapReducePool(mapReduce, this.numberOfWorkers);
}
- public void executeVertexProgram(final Consumer<VertexProgram> worker) {
+ public void executeVertexProgram(final Consumer<VertexProgram> worker) throws InterruptedException {
for (int i = 0; i < this.numberOfWorkers; i++) {
this.completionService.submit(() -> {
final VertexProgram vp = this.vertexProgramPool.take();
@@ -71,13 +71,15 @@ public final class TinkerWorkerPool implements AutoCloseable {
for (int i = 0; i < this.numberOfWorkers; i++) {
try {
this.completionService.take().get();
+ } catch (InterruptedException ie) {
+ throw ie;
} catch (final Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
- public void executeMapReduce(final Consumer<MapReduce> worker) {
+ public void executeMapReduce(final Consumer<MapReduce> worker) throws InterruptedException {
for (int i = 0; i < this.numberOfWorkers; i++) {
this.completionService.submit(() -> {
final MapReduce mr = this.mapReducePool.take();
@@ -89,12 +91,18 @@ public final class TinkerWorkerPool implements AutoCloseable {
for (int i = 0; i < this.numberOfWorkers; i++) {
try {
this.completionService.take().get();
+ } catch (InterruptedException ie) {
+ throw ie;
} catch (final Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
+ public void closeNow() throws Exception {
+ this.workerPool.shutdownNow();
+ }
+
@Override
public void close() throws Exception {
this.workerPool.shutdown();
[08/10] incubator-tinkerpop git commit: Refactored Traversal
interruption tests.
Posted by sp...@apache.org.
Refactored Traversal interruption tests.
Provided better coverage and easier maintenance by making the tests parameterized.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/8b4e670c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/8b4e670c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/8b4e670c
Branch: refs/heads/master
Commit: 8b4e670c268f3522457454ffed60b8d8b37da517
Parents: 3a5f738
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Wed Apr 20 07:52:01 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed May 4 06:43:35 2016 -0400
----------------------------------------------------------------------
.../process/AbstractGremlinProcessTest.java | 8 +-
.../gremlin/process/ProcessStandardSuite.java | 2 +
.../process/traversal/CoreTraversalTest.java | 130 -------------------
.../traversal/TraversalInterruptionTest.java | 114 ++++++++++++++++
4 files changed, 123 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8b4e670c/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/AbstractGremlinProcessTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/AbstractGremlinProcessTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/AbstractGremlinProcessTest.java
index 0c0f19d..201822c 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/AbstractGremlinProcessTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/AbstractGremlinProcessTest.java
@@ -72,10 +72,16 @@ public abstract class AbstractGremlinProcessTest extends AbstractGremlinTest {
try {
// ignore tests that aren't supported by a specific TraversalEngine
- final IgnoreEngine ignoreEngine = this.getClass().getMethod(name.getMethodName()).getAnnotation(IgnoreEngine.class);
+ final String testName = name.getMethodName();
+
+ // tests that are parameterized have a square bracket with parameterized name appended to the actual
+ // test method name. have to strip that off so that reflection can find it
+ final String methodName = testName.contains("[") ? testName.substring(0, testName.indexOf('[')) : testName;
+ final IgnoreEngine ignoreEngine = this.getClass().getMethod(methodName).getAnnotation(IgnoreEngine.class);
if (ignoreEngine != null)
assumeTrue(String.format("This test is ignored for %s", ignoreEngine.value()), !ignoreEngine.value().equals(GraphManager.getTraversalEngineType()));
} catch (NoSuchMethodException nsme) {
+ // some tests are parameterized
throw new RuntimeException(String.format("Could not find test method %s in test case %s", name.getMethodName(), this.getClass().getName()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8b4e670c/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessStandardSuite.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessStandardSuite.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessStandardSuite.java
index 5bd1f5a..644555d 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessStandardSuite.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/ProcessStandardSuite.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process;
import org.apache.tinkerpop.gremlin.AbstractGremlinSuite;
import org.apache.tinkerpop.gremlin.process.traversal.CoreTraversalTest;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalEngine;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest;
import org.apache.tinkerpop.gremlin.process.traversal.step.branch.BranchTest;
import org.apache.tinkerpop.gremlin.process.traversal.step.branch.ChooseTest;
import org.apache.tinkerpop.gremlin.process.traversal.step.branch.OptionalTest;
@@ -173,6 +174,7 @@ public class ProcessStandardSuite extends AbstractGremlinSuite {
// compliance
CoreTraversalTest.class,
+ TraversalInterruptionTest.class,
// decorations
ElementIdStrategyProcessTest.class,
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8b4e670c/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
index 200d94b..930030e 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
@@ -24,13 +24,11 @@ import org.apache.tinkerpop.gremlin.LoadGraphWith;
import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import org.hamcrest.CoreMatchers;
import org.junit.Test;
import java.util.HashSet;
@@ -38,12 +36,8 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.GRATEFUL;
import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.*;
import static org.apache.tinkerpop.gremlin.structure.Graph.Features.GraphFeatures.FEATURE_TRANSACTIONS;
@@ -62,130 +56,6 @@ import static org.junit.Assert.*;
public class CoreTraversalTest extends AbstractGremlinProcessTest {
@Test
- @LoadGraphWith(GRATEFUL)
- public void shouldRespectThreadInterruptionInGraphStep() throws Exception {
- final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
- final CountDownLatch startedIterating = new CountDownLatch(1);
- final Thread t = new Thread(() -> {
- try {
- final Traversal traversal = g.V().sideEffect(traverser -> {
- startedIterating.countDown();
-
- // ensure that the whole traversal doesn't iterate out before we get a chance to interrupt
- if (startedIterating.getCount() == 0) {
- try {
- Thread.sleep(3000);
- } catch (Exception ignored) {
- Thread.currentThread().interrupt();
- }
- }
- });
- traversal.iterate();
- } catch (Exception ex) {
- exceptionThrown.set(ex instanceof TraversalInterruptedException);
- }
- });
-
- t.start();
-
- // total time for test should not exceed 5 seconds - this prevents the test from just hanging and allows
- // it to finish with failure
- assertThat(startedIterating.await(5000, TimeUnit.MILLISECONDS), CoreMatchers.is(true));
-
- t.interrupt();
- t.join();
-
- // ensure that some but not all of the traversal was iterated and that the right exception was tossed
- assertThat(exceptionThrown.get(), CoreMatchers.is(true));
- }
-
- @Test
- @LoadGraphWith(GRATEFUL)
- public void shouldRespectThreadInterruptionInVertexStep() throws Exception {
- final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
- final CountDownLatch startedIterating = new CountDownLatch(1);
- final Thread t = new Thread(() -> {
- try {
- final AtomicBoolean first = new AtomicBoolean(true);
- final Traversal traversal = g.V().sideEffect(traverser -> {
- // let the first iteration flow through
- if (!first.compareAndSet(true, false)) {
- // ensure that the whole traversal doesn't iterate out before we get a chance to interrupt
- // the next iteration should stop so we can force the interrupt to be handled by VertexStep
- try {
- Thread.sleep(3000);
- } catch (Exception ignored) {
- // make sure that the interrupt propagates in case the interrupt occurs during sleep.
- // this should ensure VertexStep gets to try to throw the TraversalInterruptedException
- Thread.currentThread().interrupt();
- }
- }
- }).out().sideEffect(traverser -> {
- startedIterating.countDown();
- });
- traversal.iterate();
- } catch (Exception ex) {
- exceptionThrown.set(ex instanceof TraversalInterruptedException);
- }
- });
-
- t.start();
-
- // total time for test should not exceed 5 seconds - this prevents the test from just hanging and allows
- // it to finish with failure
- assertThat(startedIterating.await(5000, TimeUnit.MILLISECONDS), CoreMatchers.is(true));
-
- t.interrupt();
- t.join();
-
- // ensure that some but not all of the traversal was iterated and that the right exception was tossed
- assertThat(exceptionThrown.get(), CoreMatchers.is(true));
- }
-
- @Test
- @LoadGraphWith(GRATEFUL)
- public void shouldRespectThreadInterruptionInPropertyStep() throws Exception {
- final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
- final CountDownLatch startedIterating = new CountDownLatch(1);
- final Thread t = new Thread(() -> {
- try {
- final AtomicBoolean first = new AtomicBoolean(true);
- final Traversal traversal = g.V().sideEffect(traverser -> {
- // let the first iteration flow through
- if (!first.compareAndSet(true, false)) {
- // ensure that the whole traversal doesn't iterate out before we get a chance to interrupt
- // the next iteration should stop so we can force the interrupt to be handled by PropertyStep
- try {
- Thread.sleep(3000);
- } catch (Exception ignored) {
- // make sure that the interrupt propagates in case the interrupt occurs during sleep.
- // this should ensure PropertyStep gets to try to throw the TraversalInterruptedException
- Thread.currentThread().interrupt();
- }
- }
- }).properties().sideEffect(traverser -> {
- startedIterating.countDown();
- });
- traversal.iterate();
- } catch (Exception ex) {
- exceptionThrown.set(ex instanceof TraversalInterruptedException);
- }
- });
-
- t.start();
-
- // total time for test should not exceed 5 seconds - this prevents the test from just hanging and allows
- // it to finish with failure
- assertThat(startedIterating.await(5000, TimeUnit.MILLISECONDS), CoreMatchers.is(true));
-
- t.interrupt();
- t.join();
-
- // ensure that some but not all of the traversal was iterated and that the right exception was tossed
- assertThat(exceptionThrown.get(), CoreMatchers.is(true));
- }
-
- @Test
@LoadGraphWith
public void shouldNeverPropagateANoBulkTraverser() {
assertFalse(g.V().dedup().sideEffect(t -> t.asAdmin().setBulk(0)).hasNext());
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8b4e670c/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java
new file mode 100644
index 0000000..e9d584f
--- /dev/null
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal;
+
+import org.apache.tinkerpop.gremlin.LoadGraphWith;
+import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.UnaryOperator;
+
+import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.GRATEFUL;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+@RunWith(Parameterized.class)
+public class TraversalInterruptionTest extends AbstractGremlinProcessTest {
+
+ @Parameterized.Parameters(name = "expectInterruption({0})")
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {"g_V", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V(), (UnaryOperator<GraphTraversal<?,?>>) t -> t},
+ {"g_V_out", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V(), (UnaryOperator<GraphTraversal<?,?>>) t -> t.out()},
+ {"g_V_outE", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V(), (UnaryOperator<GraphTraversal<?,?>>) t -> t.outE()},
+ {"g_V_in", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V(), (UnaryOperator<GraphTraversal<?,?>>) t -> t.in()},
+ {"g_V_inE", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V(), (UnaryOperator<GraphTraversal<?,?>>) t -> t.inE()},
+ {"g_V_properties", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.V(), (UnaryOperator<GraphTraversal<?,?>>) t -> t.properties()},
+ {"g_E", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.E(), (UnaryOperator<GraphTraversal<?,?>>) t -> t},
+ {"g_E_outV", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.E(), (UnaryOperator<GraphTraversal<?,?>>) t -> t.outV()},
+ {"g_E_inV", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.E(), (UnaryOperator<GraphTraversal<?,?>>) t -> t.inV()},
+ {"g_E_properties", (Function<GraphTraversalSource, GraphTraversal<?,?>>) g -> g.E(), (UnaryOperator<GraphTraversal<?,?>>) t -> t.properties()},
+ });
+ }
+
+ @Parameterized.Parameter(value = 0)
+ public String name;
+
+ @Parameterized.Parameter(value = 1)
+ public Function<GraphTraversalSource,GraphTraversal<?,?>> traversalBeforePause;
+
+ @Parameterized.Parameter(value = 2)
+ public UnaryOperator<GraphTraversal<?,?>> traversalAfterPause;
+
+ @Test
+ @LoadGraphWith(GRATEFUL)
+ public void shouldRespectThreadInterruptionInVertexStep() throws Exception {
+ final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
+ final CountDownLatch startedIterating = new CountDownLatch(1);
+ final Thread t = new Thread(() -> {
+ try {
+ final AtomicBoolean first = new AtomicBoolean(true);
+ final Traversal traversal = traversalAfterPause.apply(traversalBeforePause.apply(g).sideEffect(traverser -> {
+ // let the first iteration flow through
+ if (!first.compareAndSet(true, false)) {
+ // ensure that the whole traversal doesn't iterate out before we get a chance to interrupt
+ // the next iteration should stop so we can force the interrupt to be handled by VertexStep
+ try {
+ Thread.sleep(3000);
+ } catch (Exception ignored) {
+ // make sure that the interrupt propagates in case the interrupt occurs during sleep.
+ // this should ensure VertexStep gets to try to throw the TraversalInterruptedException
+ Thread.currentThread().interrupt();
+ }
+ }
+ })).sideEffect(traverser -> {
+ startedIterating.countDown();
+ });
+ traversal.iterate();
+ } catch (Exception ex) {
+ exceptionThrown.set(ex instanceof TraversalInterruptedException);
+ }
+ });
+
+ t.start();
+
+ // total time for test should not exceed 5 seconds - this prevents the test from just hanging and allows
+ // it to finish with failure
+ assertThat(startedIterating.await(5000, TimeUnit.MILLISECONDS), CoreMatchers.is(true));
+
+ t.interrupt();
+ t.join();
+
+ // ensure that some but not all of the traversal was iterated and that the right exception was tossed
+ assertThat(exceptionThrown.get(), CoreMatchers.is(true));
+ }
+}
[06/10] incubator-tinkerpop git commit: Removed redundant interrupt
checks.
Posted by sp...@apache.org.
Removed redundant interrupt checks.
Interruption checks in AbstractStep should be enough to cover all of these checks.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/400e2763
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/400e2763
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/400e2763
Branch: refs/heads/master
Commit: 400e2763f51b80c2fd301b36ae5753b563d0092c
Parents: f69e991
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Tue May 3 07:36:51 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed May 4 06:43:35 2016 -0400
----------------------------------------------------------------------
.../tinkerpop/gremlin/process/traversal/step/branch/LocalStep.java | 2 --
.../gremlin/process/traversal/step/filter/FilterStep.java | 2 --
.../tinkerpop/gremlin/process/traversal/step/map/FlatMapStep.java | 2 --
.../tinkerpop/gremlin/process/traversal/step/map/GraphStep.java | 2 --
.../tinkerpop/gremlin/process/traversal/step/map/MatchStep.java | 2 --
.../gremlin/process/traversal/step/map/NoOpBarrierStep.java | 2 --
.../gremlin/process/traversal/step/util/ComputerAwareStep.java | 2 --
7 files changed, 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/400e2763/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/LocalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/LocalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/LocalStep.java
index e0a6520..e2affd4 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/LocalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/LocalStep.java
@@ -24,7 +24,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import java.util.Collections;
@@ -62,7 +61,6 @@ public final class LocalStep<S, E> extends AbstractStep<S, E> implements Travers
this.localTraversal.addStart(this.starts.next());
}
while (true) {
- if(Thread.interrupted()) throw new TraversalInterruptedException();
if (this.localTraversal.hasNext())
return this.localTraversal.getEndStep().next();
else if (this.starts.hasNext()) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/400e2763/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/FilterStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/FilterStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/FilterStep.java
index d264ec7..e07d951 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/FilterStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/FilterStep.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.process.traversal.step.filter;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -35,7 +34,6 @@ public abstract class FilterStep<S> extends AbstractStep<S, S> {
@Override
protected Traverser.Admin<S> processNextStart() {
while (true) {
- if(Thread.interrupted()) throw new TraversalInterruptedException();
final Traverser.Admin<S> traverser = this.starts.next();
if (this.filter(traverser))
return traverser;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/400e2763/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FlatMapStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FlatMapStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FlatMapStep.java
index ea66696..3d7dc24 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FlatMapStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FlatMapStep.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.process.traversal.step.map;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.util.iterator.EmptyIterator;
import java.util.Iterator;
@@ -41,7 +40,6 @@ public abstract class FlatMapStep<S, E> extends AbstractStep<S, E> {
@Override
protected Traverser.Admin<E> processNextStart() {
while (true) {
- if(Thread.interrupted()) throw new TraversalInterruptedException();
if (this.iterator.hasNext()) {
return this.head.split(this.iterator.next(), this);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/400e2763/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java
index ea72fb3..3f169b0 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java
@@ -28,7 +28,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer;
import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.T;
@@ -126,7 +125,6 @@ public class GraphStep<S, E extends Element> extends AbstractStep<S, E> implemen
@Override
protected Traverser.Admin<E> processNextStart() {
while (true) {
- if(Thread.interrupted()) throw new TraversalInterruptedException();
if (this.iterator.hasNext()) {
return this.isStart ? this.getTraversal().getTraverserGenerator().generate(this.iterator.next(), (Step) this, 1l) : this.head.split(this.iterator.next(), this);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/400e2763/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchStep.java
index 907764b..c6e3bbe 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchStep.java
@@ -31,7 +31,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.Connec
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
@@ -447,7 +446,6 @@ public final class MatchStep<S, E> extends ComputerAwareStep<S, Map<String, E>>
@Override
protected Traverser.Admin<Object> processNextStart() throws NoSuchElementException {
while (true) {
- if(Thread.interrupted()) throw new TraversalInterruptedException();
final Traverser.Admin traverser = this.starts.next();
// no end label
if (null == this.matchKey) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/400e2763/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/NoOpBarrierStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/NoOpBarrierStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/NoOpBarrierStep.java
index cb2985b..bedf078 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/NoOpBarrierStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/NoOpBarrierStep.java
@@ -26,7 +26,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import java.util.Collections;
@@ -65,7 +64,6 @@ public final class NoOpBarrierStep<S> extends AbstractStep<S, S> implements Loca
@Override
public void processAllStarts() {
while (this.starts.hasNext() && (this.maxBarrierSize == Integer.MAX_VALUE || this.barrier.size() < this.maxBarrierSize)) {
- if(Thread.interrupted()) throw new TraversalInterruptedException();
final Traverser.Admin<S> traverser = this.starts.next();
traverser.setStepId(this.getNextStep().getId()); // when barrier is reloaded, the traversers should be at the next step
this.barrier.add(traverser);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/400e2763/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java
index 227ccf3..5acff58 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.process.traversal.step.util;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.util.iterator.EmptyIterator;
@@ -42,7 +41,6 @@ public abstract class ComputerAwareStep<S, E> extends AbstractStep<S, E> impleme
@Override
protected Traverser.Admin<E> processNextStart() throws NoSuchElementException {
while (true) {
- if(Thread.interrupted()) throw new TraversalInterruptedException();
if (this.previousIterator.hasNext())
return this.previousIterator.next();
this.previousIterator = this.traverserStepIdAndLabelsSetByChild ? this.computerAlgorithm() : this.standardAlgorithm();
[02/10] incubator-tinkerpop git commit: Cleaned up OptOuts around
traversal interruption test for giraph/spark.
Posted by sp...@apache.org.
Cleaned up OptOuts around traversal interruption test for giraph/spark.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/ab96ed18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/ab96ed18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/ab96ed18
Branch: refs/heads/master
Commit: ab96ed18b13e959da654b844f791b08381f10bd3
Parents: 173034e
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri Apr 22 15:49:29 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed May 4 06:43:35 2016 -0400
----------------------------------------------------------------------
.../apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ab96ed18/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index 08bbeeb..fce3953 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -182,10 +182,10 @@ import java.util.stream.Stream;
reason = "Spark executes map and combine in a lazy fashion and thus, fails the blocking aspect of this test",
computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer"})
@Graph.OptOut(
- test = "org.apache.tinkerpop.gremlin.process.computer.TraversalInterruptionComputerTest",
+ test = "org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionComputerTest",
method = "*",
reason = "This test makes an use of a sideEffect to enforce when a thread interruption is triggered and thus isn't applicable to HadoopGraph",
- computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer"})
+ computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer", "org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphGraphComputer"})
public final class HadoopGraph implements Graph {
public static final Logger LOGGER = LoggerFactory.getLogger(HadoopGraph.class);
[07/10] incubator-tinkerpop git commit: Added a couple more OptOuts.
Posted by sp...@apache.org.
Added a couple more OptOuts.
The interruption tests are not good for RemoteGraph or HadoopGraph. The interruption models are different there given their different processing. Those might need specific testing.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/f69e9915
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/f69e9915
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/f69e9915
Branch: refs/heads/master
Commit: f69e9915930eeae4651914b7ed02f32358d11ebc
Parents: ab96ed1
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Sat Apr 30 06:31:27 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed May 4 06:43:35 2016 -0400
----------------------------------------------------------------------
.../apache/tinkerpop/gremlin/process/remote/RemoteGraph.java | 8 ++++++++
.../tinkerpop/gremlin/hadoop/structure/HadoopGraph.java | 6 +++++-
2 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f69e9915/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteGraph.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteGraph.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteGraph.java
index 78512e2..f46fe90 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteGraph.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteGraph.java
@@ -93,6 +93,14 @@ import java.util.Iterator;
test = "org.apache.tinkerpop.gremlin.process.computer.GraphComputerTest",
method = "*",
reason = "RemoteGraph does not support direct Graph.compute() access")
+@Graph.OptOut(
+ test = "org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest",
+ method = "*",
+ reason = "The interruption model in the test can't guarantee interruption at the right time with RemoteGraph.")
+@Graph.OptOut(
+ test = "org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionComputerTest",
+ method = "*",
+ reason = "The interruption model in the test can't guarantee interruption at the right time with RemoteGraph.")
public class RemoteGraph implements Graph {
private final RemoteConnection connection;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f69e9915/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index fce3953..bf2f08b 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -182,9 +182,13 @@ import java.util.stream.Stream;
reason = "Spark executes map and combine in a lazy fashion and thus, fails the blocking aspect of this test",
computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer"})
@Graph.OptOut(
+ test = "org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest",
+ method = "*",
+ reason = "The interruption model in the test can't guarantee interruption at the right time with HadoopGraph.")
+@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionComputerTest",
method = "*",
- reason = "This test makes an use of a sideEffect to enforce when a thread interruption is triggered and thus isn't applicable to HadoopGraph",
+ reason = "This test makes use of a sideEffect to enforce when a thread interruption is triggered and thus isn't applicable to HadoopGraph",
computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer", "org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphGraphComputer"})
public final class HadoopGraph implements Graph {
[10/10] incubator-tinkerpop git commit: Updated upgrade docs about
traversal cancellation.
Posted by sp...@apache.org.
Updated upgrade docs about traversal cancellation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/8fd0bae0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/8fd0bae0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/8fd0bae0
Branch: refs/heads/master
Commit: 8fd0bae0fb91e2f1b911b6dc1623d0482b018070
Parents: a447a24
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Wed May 4 07:00:11 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed May 4 07:00:11 2016 -0400
----------------------------------------------------------------------
.../upgrade/release-3.2.x-incubating.asciidoc | 32 ++++++++++++++++++--
1 file changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8fd0bae0/docs/src/upgrade/release-3.2.x-incubating.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/upgrade/release-3.2.x-incubating.asciidoc b/docs/src/upgrade/release-3.2.x-incubating.asciidoc
index 7772e0f..0d5836d 100644
--- a/docs/src/upgrade/release-3.2.x-incubating.asciidoc
+++ b/docs/src/upgrade/release-3.2.x-incubating.asciidoc
@@ -27,13 +27,25 @@ TinkerPop 3.2.1
*Release Date: NOT OFFICIALLY RELEASED YET*
-Please see the link:https://github.com/apache/incubator-tinkerpop/blob/3.2.1-incubating/CHANGELOG.asciidoc#tinkerpop-313-release-date-MONTH-DAY-YEAR[changelog] for a complete list of all the modifications that are part of this release.
+Please see the link:https://github.com/apache/incubator-tinkerpop/blob/3.2.1-incubating/CHANGELOG.asciidoc#tinkerpop-321-release-date-MONTH-DAY-YEAR[changelog] for a complete list of all the modifications that are part of this release.
Upgrading for Users
~~~~~~~~~~~~~~~~~~~
+Interrupting Traversals
+^^^^^^^^^^^^^^^^^^^^^^^
+
+Traversals now better respect calls to `Thread.interrupt()`, which mean that a running `Traversal` can now be
+cancelled. There are some limitations that remain, but most OLTP-based traversals should cancel without
+issue. OLAP-based traversals for Spark will also cancel and clean up running jobs in Spark itself. Mileage may vary
+on other process implementations and it is possible that graph providers could potentially write custom step
+implementations that prevent interruption. If it is found that there are configurations or specific traversals that
+do not respect interruption, please mention them on the mailing list.
+
+See: https://issues.apache.org/jira/browse/TINKERPOP-946[TINKERPOP-946]
+
Gremlin Console Flags
-+++++++++++++++++++++
+^^^^^^^^^^^^^^^^^^^^^
Gremlin Console had several methods for executing scripts from file at the start-up of `bin/gremlin.sh`. There were
two options:
@@ -71,6 +83,22 @@ link:https://issues.apache.org/jira/browse/TINKERPOP-1157[TINKERPOP-1157],
link:http://tinkerpop.apache.org/docs/3.2.1-incubating/reference/#interactive-mode[Reference Documentation - Interactive Mode],
link:http://tinkerpop.apache.org/docs/3.2.1-incubating/reference/#execution-mode[Reference Documentation - Execution Mode]
+Upgrading for Providers
+~~~~~~~~~~~~~~~~~~~~~~~
+
+Graph System Providers
+^^^^^^^^^^^^^^^^^^^^^^
+
+Interrupting Traversals
++++++++++++++++++++++++
+
+Several tests have been added to the TinkerPop test suite to validate that a `Traversal` can be cancelled with
+`Thread.interrupt()`. The test suite does not cover all possible traversal scenarios. When implementing custom steps,
+providers should take care to not ignore an `InterruptionException` that might be thrown in their code and to be sure
+to check `Thread.isInterrupted()` as needed to ensure that the step remains cancellation compliant.
+
+See: https://issues.apache.org/jira/browse/TINKERPOP-946[TINKERPOP-946]
+
TinkerPop 3.2.0
---------------
[04/10] incubator-tinkerpop git commit: Added support for Traversal
interruption.
Posted by sp...@apache.org.
Added support for Traversal interruption.
Check for Thread.interrupted() in loops within steps and throw an unchecked TraversalInterruptedException. Would be better to throw InterruptedException but that would involve a breaking change which would be nice to avoid.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/0d4c07fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/0d4c07fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/0d4c07fd
Branch: refs/heads/master
Commit: 0d4c07fd57dd1179c50d7acdedb21bb4e8e6ab52
Parents: 3e7d19d
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Mon Apr 18 08:50:33 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed May 4 06:43:35 2016 -0400
----------------------------------------------------------------------
.../benchmark/util/AbstractBenchmarkBase.java | 2 +-
.../traversal/step/map/ComputerResultStep.java | 2 +
.../traversal/step/map/VertexProgramStep.java | 2 +
.../traversal/step/branch/LocalStep.java | 2 +
.../traversal/step/filter/FilterStep.java | 2 +
.../process/traversal/step/map/FlatMapStep.java | 2 +
.../process/traversal/step/map/GraphStep.java | 2 +
.../process/traversal/step/map/MatchStep.java | 2 +
.../traversal/step/map/NoOpBarrierStep.java | 2 +
.../traversal/step/util/AbstractStep.java | 3 ++
.../traversal/step/util/ComputerAwareStep.java | 2 +
.../util/TraversalInterruptedException.java | 9 +++++
.../traversal/util/DefaultTraversalTest.java | 41 ++++++++++++++++++++
13 files changed, 72 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d4c07fd/gremlin-benchmark/src/main/java/org/apache/tinkerpop/benchmark/util/AbstractBenchmarkBase.java
----------------------------------------------------------------------
diff --git a/gremlin-benchmark/src/main/java/org/apache/tinkerpop/benchmark/util/AbstractBenchmarkBase.java b/gremlin-benchmark/src/main/java/org/apache/tinkerpop/benchmark/util/AbstractBenchmarkBase.java
index 5b73045..2f8bb66 100644
--- a/gremlin-benchmark/src/main/java/org/apache/tinkerpop/benchmark/util/AbstractBenchmarkBase.java
+++ b/gremlin-benchmark/src/main/java/org/apache/tinkerpop/benchmark/util/AbstractBenchmarkBase.java
@@ -50,7 +50,7 @@ public abstract class AbstractBenchmarkBase {
protected static final String DEFAULT_BENCHMARK_DIRECTORY = "./benchmarks/";
protected static final String[] JVM_ARGS = {
- "-server", "-Xms1g", "-Xmx1g"
+ "-server", "-Xms2g", "-Xmx2g"
};
@Test
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d4c07fd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ComputerResultStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ComputerResultStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ComputerResultStep.java
index b5fd8e8..c8b56a2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ComputerResultStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ComputerResultStep.java
@@ -25,6 +25,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Property;
@@ -62,6 +63,7 @@ public final class ComputerResultStep<S> extends AbstractStep<ComputerResult, S>
@Override
protected Traverser.Admin<S> processNextStart() throws NoSuchElementException {
while (true) {
+ if(Thread.interrupted()) throw new TraversalInterruptedException();
if (this.currentIterator.hasNext())
return this.currentIterator.next();
else {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d4c07fd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
index f9e2936..ab095f2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/VertexProgramStep.java
@@ -31,6 +31,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.ProfileStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.Graph;
import java.util.NoSuchElementException;
@@ -83,6 +84,7 @@ public abstract class VertexProgramStep extends AbstractStep<ComputerResult, Com
protected boolean previousTraversalVertexProgram() {
Step<?, ?> currentStep = this;
while (!(currentStep instanceof EmptyStep)) {
+ if(Thread.interrupted()) throw new TraversalInterruptedException();
if (currentStep instanceof TraversalVertexProgramStep)
return true;
currentStep = currentStep.getPreviousStep();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d4c07fd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/LocalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/LocalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/LocalStep.java
index e2affd4..e0a6520 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/LocalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/LocalStep.java
@@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import java.util.Collections;
@@ -61,6 +62,7 @@ public final class LocalStep<S, E> extends AbstractStep<S, E> implements Travers
this.localTraversal.addStart(this.starts.next());
}
while (true) {
+ if(Thread.interrupted()) throw new TraversalInterruptedException();
if (this.localTraversal.hasNext())
return this.localTraversal.getEndStep().next();
else if (this.starts.hasNext()) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d4c07fd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/FilterStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/FilterStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/FilterStep.java
index e07d951..d264ec7 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/FilterStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/FilterStep.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.traversal.step.filter;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -34,6 +35,7 @@ public abstract class FilterStep<S> extends AbstractStep<S, S> {
@Override
protected Traverser.Admin<S> processNextStart() {
while (true) {
+ if(Thread.interrupted()) throw new TraversalInterruptedException();
final Traverser.Admin<S> traverser = this.starts.next();
if (this.filter(traverser))
return traverser;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d4c07fd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FlatMapStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FlatMapStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FlatMapStep.java
index 3d7dc24..ea66696 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FlatMapStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FlatMapStep.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.traversal.step.map;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.util.iterator.EmptyIterator;
import java.util.Iterator;
@@ -40,6 +41,7 @@ public abstract class FlatMapStep<S, E> extends AbstractStep<S, E> {
@Override
protected Traverser.Admin<E> processNextStart() {
while (true) {
+ if(Thread.interrupted()) throw new TraversalInterruptedException();
if (this.iterator.hasNext()) {
return this.head.split(this.iterator.next(), this);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d4c07fd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java
index 3f169b0..ea72fb3 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java
@@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer;
import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.T;
@@ -125,6 +126,7 @@ public class GraphStep<S, E extends Element> extends AbstractStep<S, E> implemen
@Override
protected Traverser.Admin<E> processNextStart() {
while (true) {
+ if(Thread.interrupted()) throw new TraversalInterruptedException();
if (this.iterator.hasNext()) {
return this.isStart ? this.getTraversal().getTraverserGenerator().generate(this.iterator.next(), (Step) this, 1l) : this.head.split(this.iterator.next(), this);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d4c07fd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchStep.java
index c6e3bbe..907764b 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchStep.java
@@ -31,6 +31,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.Connec
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
@@ -446,6 +447,7 @@ public final class MatchStep<S, E> extends ComputerAwareStep<S, Map<String, E>>
@Override
protected Traverser.Admin<Object> processNextStart() throws NoSuchElementException {
while (true) {
+ if(Thread.interrupted()) throw new TraversalInterruptedException();
final Traverser.Admin traverser = this.starts.next();
// no end label
if (null == this.matchKey) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d4c07fd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/NoOpBarrierStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/NoOpBarrierStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/NoOpBarrierStep.java
index bedf078..cb2985b 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/NoOpBarrierStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/NoOpBarrierStep.java
@@ -26,6 +26,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import java.util.Collections;
@@ -64,6 +65,7 @@ public final class NoOpBarrierStep<S> extends AbstractStep<S, S> implements Loca
@Override
public void processAllStarts() {
while (this.starts.hasNext() && (this.maxBarrierSize == Integer.MAX_VALUE || this.barrier.size() < this.maxBarrierSize)) {
+ if(Thread.interrupted()) throw new TraversalInterruptedException();
final Traverser.Admin<S> traverser = this.starts.next();
traverser.setStepId(this.getNextStep().getId()); // when barrier is reloaded, the traversers should be at the next step
this.barrier.add(traverser);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d4c07fd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/AbstractStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/AbstractStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/AbstractStep.java
index 8ad847a..2f83e9e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/AbstractStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/AbstractStep.java
@@ -22,6 +22,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.util.EmptyTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import java.util.Collections;
@@ -123,6 +124,7 @@ public abstract class AbstractStep<S, E> implements Step<S, E> {
}
} else {
while (true) {
+ if(Thread.interrupted()) throw new TraversalInterruptedException();
final Traverser.Admin<E> traverser = this.processNextStart();
if (null != traverser.get() && 0 != traverser.bulk())
return this.prepareTraversalForNextStep(traverser);
@@ -137,6 +139,7 @@ public abstract class AbstractStep<S, E> implements Step<S, E> {
else {
try {
while (true) {
+ if(Thread.interrupted()) throw new TraversalInterruptedException();
this.nextEnd = this.processNextStart();
if (null != this.nextEnd.get() && 0 != this.nextEnd.bulk())
return true;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d4c07fd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java
index 5acff58..227ccf3 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.traversal.step.util;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.util.iterator.EmptyIterator;
@@ -41,6 +42,7 @@ public abstract class ComputerAwareStep<S, E> extends AbstractStep<S, E> impleme
@Override
protected Traverser.Admin<E> processNextStart() throws NoSuchElementException {
while (true) {
+ if(Thread.interrupted()) throw new TraversalInterruptedException();
if (this.previousIterator.hasNext())
return this.previousIterator.next();
this.previousIterator = this.traverserStepIdAndLabelsSetByChild ? this.computerAlgorithm() : this.standardAlgorithm();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d4c07fd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalInterruptedException.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalInterruptedException.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalInterruptedException.java
new file mode 100644
index 0000000..216a762
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalInterruptedException.java
@@ -0,0 +1,9 @@
+package org.apache.tinkerpop.gremlin.process.traversal.util;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+
+/**
+ * Thrown if a {@link Traversal} is interrupted during execution. This is an unchecked exception.
+ */
+public class TraversalInterruptedException extends RuntimeException {
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d4c07fd/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversalTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversalTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversalTest.java
index d252ccf..c1a4c6b 100644
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversalTest.java
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversalTest.java
@@ -28,17 +28,29 @@ import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
import org.apache.tinkerpop.gremlin.util.function.ConstantSupplier;
import org.apache.tinkerpop.gremlin.util.function.HashSetSupplier;
+import org.hamcrest.CoreMatchers;
import org.junit.Test;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.is;
+import static org.hamcrest.number.OrderingComparison.greaterThan;
+import static org.hamcrest.number.OrderingComparison.lessThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -46,6 +58,35 @@ import static org.junit.Assert.assertTrue;
public class DefaultTraversalTest {
@Test
+ public void shouldRespectThreadInterruption() throws Exception {
+ final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch startedIterating = new CountDownLatch(100);
+ final List<Integer> l = IntStream.range(0, 1000000).boxed().collect(Collectors.toList());
+ final Thread t = new Thread(() -> {
+ try {
+ __.inject(l).unfold().sideEffect(i -> {
+ startedIterating.countDown();
+ counter.incrementAndGet();
+ }).iterate();
+ fail("Traversal should have been interrupted");
+ } catch (Exception ex) {
+ exceptionThrown.set(ex instanceof TraversalInterruptedException);
+ }
+ });
+
+ t.start();
+ startedIterating.await();
+ t.interrupt();
+ t.join();
+
+ // ensure that some but not all of the traversal was iterated and that the right exception was tossed
+ assertThat(counter.get(), greaterThan(0));
+ assertThat(counter.get(), lessThan(1000000));
+ assertThat(exceptionThrown.get(), CoreMatchers.is(true));
+ }
+
+ @Test
public void shouldCloneTraversalCorrectly() {
final DefaultGraphTraversal<?, ?> original = new DefaultGraphTraversal<>();
original.out().groupCount("m").values("name").count();