You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2016/12/20 20:25:24 UTC
[06/47] tinkerpop git commit: TINKERPOP-1490 Restructured
Traversal.promise()
TINKERPOP-1490 Restructured Traversal.promise()
No longer uses an ExecutorService and is only applicable to "remote" traversals. Moved the commons-lang dependency back to gremlin-groovy for now.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/ee6a3589
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/ee6a3589
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/ee6a3589
Branch: refs/heads/TINKERPOP-1130
Commit: ee6a35893661b015dbb827463f175ddcecf1bcb8
Parents: eb08976
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri Nov 11 12:51:40 2016 -0500
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri Dec 16 10:00:40 2016 -0500
----------------------------------------------------------------------
gremlin-core/pom.xml | 5 -
.../process/remote/RemoteConnection.java | 12 +-
.../remote/traversal/RemoteTraversal.java | 2 +-
.../remote/traversal/step/map/RemoteStep.java | 32 +++-
.../gremlin/process/traversal/Traversal.java | 57 ++------
.../traversal/util/DefaultTraversal.java | 37 -----
.../process/traversal/TraversalTest.java | 145 -------------------
.../tinkerpop/gremlin/driver/Connection.java | 6 +-
.../driver/remote/DriverRemoteConnection.java | 14 ++
.../driver/remote/DriverRemoteTraversal.java | 16 +-
.../DriverRemoteTraversalSideEffects.java | 22 ++-
.../DriverRemoteTraversalSideEffectsTest.java | 12 +-
gremlin-groovy/pom.xml | 5 +
.../server/GremlinServerIntegrateTest.java | 25 ++++
.../process/traversal/CoreTraversalTest.java | 42 ------
15 files changed, 131 insertions(+), 301 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/pom.xml
----------------------------------------------------------------------
diff --git a/gremlin-core/pom.xml b/gremlin-core/pom.xml
index 0594448..e8f3a34 100644
--- a/gremlin-core/pom.xml
+++ b/gremlin-core/pom.xml
@@ -61,11 +61,6 @@ limitations under the License.
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>3.3.1</version>
- </dependency>
<!-- LOGGING -->
<dependency>
<groupId>org.slf4j</groupId>
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
index 8506ad7..f4e3976 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
@@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
/**
* A simple abstraction of a "connection" to a "server" that is capable of processing a {@link Traversal} and
@@ -43,9 +44,16 @@ public interface RemoteConnection extends AutoCloseable {
public <E> Iterator<Traverser.Admin<E>> submit(final Traversal<?, E> traversal) throws RemoteConnectionException;
/**
- * Submits {@link Traversal} {@link Bytecode} to a server and returns a {@link Traversal}.
- * The {@link Traversal} is an abstraction over two types of results that can be returned as part of the
+ * Submits {@link Traversal} {@link Bytecode} to a server and returns a {@link RemoteTraversal}.
+ * The {@link RemoteTraversal} is an abstraction over two types of results that can be returned as part of the
* response from the server: the results of the {@link Traversal} itself and the side-effects that it produced.
*/
public <E> RemoteTraversal<?,E> submit(final Bytecode bytecode) throws RemoteConnectionException;
+
+ /**
+ * Submits {@link Traversal} {@link Bytecode} to a server and returns a promise of a {@link RemoteTraversal}.
+ * The {@link RemoteTraversal} is an abstraction over two types of results that can be returned as part of the
+ * response from the server: the results of the {@link Traversal} itself and the side-effects that it produced.
+ */
+ public <E> CompletableFuture<RemoteTraversal<?, E>> submitAsync(final Bytecode bytecode) throws RemoteConnectionException;
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java
index 9c893c2..57b0cda 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java
@@ -39,7 +39,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
public interface RemoteTraversal<S,E> extends Traversal.Admin<S,E> {
/**
- * Returns remote side-effects generated by the traversal so that they can accessible to the client. Note that
+ * Returns remote side-effects generated by the traversal so that they can be accessible to the client. Note that
* "side-effect" refers to the value in "a" in the traversal {@code g.V().aggregate('a').values('name')}.
*/
@Override
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java
index 6b2be96..3e19097 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java
@@ -21,12 +21,17 @@ package org.apache.tinkerpop.gremlin.process.remote.traversal.step.map;
import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection;
import org.apache.tinkerpop.gremlin.process.remote.RemoteConnectionException;
import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Sends a {@link Traversal} to a {@link RemoteConnection} and iterates back the results.
@@ -38,6 +43,7 @@ public final class RemoteStep<S, E> extends AbstractStep<S, E> {
private transient RemoteConnection remoteConnection;
private RemoteTraversal<?, E> remoteTraversal;
+ private final AtomicReference<CompletableFuture<Traversal<?, E>>> traversalFuture = new AtomicReference<>(null);
public RemoteStep(final Traversal.Admin traversal, final RemoteConnection remoteConnection) {
super(traversal);
@@ -51,14 +57,26 @@ public final class RemoteStep<S, E> extends AbstractStep<S, E> {
@Override
protected Traverser.Admin<E> processNextStart() throws NoSuchElementException {
- if (null == this.remoteTraversal) {
- try {
- this.remoteTraversal = this.remoteConnection.submit(this.traversal.getBytecode());
- this.traversal.setSideEffects(this.remoteTraversal.getSideEffects());
- } catch (final RemoteConnectionException sce) {
- throw new IllegalStateException(sce);
+ if (null == this.remoteTraversal) promise().join();
+ return this.remoteTraversal.nextTraverser();
+ }
+
+ /**
+ * Submits the traversal asynchronously to a "remote" using {@link RemoteConnection#submitAsync(Bytecode)}.
+ */
+ public CompletableFuture<Traversal<?, E>> promise() {
+ try {
+ if (null == traversalFuture.get()) {
+ traversalFuture.set(this.remoteConnection.submitAsync(this.traversal.getBytecode()).<Traversal<?, E>>thenApply(t -> {
+ this.remoteTraversal = (RemoteTraversal<?, E>) t;
+ this.traversal.setSideEffects(this.remoteTraversal.getSideEffects());
+ return traversal;
+ }));
}
+
+ return traversalFuture.get();
+ } catch (RemoteConnectionException rce) {
+ throw new IllegalStateException(rce);
}
- return this.remoteTraversal.nextTraverser();
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Traversal.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Traversal.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Traversal.java
index e4ba5a6..04f5127 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Traversal.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Traversal.java
@@ -18,8 +18,9 @@
*/
package org.apache.tinkerpop.gremlin.process.traversal;
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.step.map.RemoteStep;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.ProfileSideEffectStep;
@@ -43,11 +44,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
@@ -148,39 +145,21 @@ public interface Traversal<S, E> extends Iterator<E>, Serializable, Cloneable, A
/**
* Starts a promise to execute a function on the current {@code Traversal} that will be completed in the future.
- * This implementation uses {@link Admin#traversalExecutorService} to execute the supplied
- * {@code traversalFunction}.
+ * Note that this method can only be used if the {@code Traversal} is constructed using
+ * {@link TraversalSource#withRemote(Configuration)}. Calling this method otherwise will yield an
+ * {@code IllegalStateException}.
*/
public default <T> CompletableFuture<T> promise(final Function<Traversal, T> traversalFunction) {
- return promise(traversalFunction, Admin.traversalExecutorService);
- }
-
- /**
- * Starts a promise to execute a function on the current {@code Traversal} that will be completed in the future.
- * This implementation uses the caller supplied {@code ExecutorService} to execute the {@code traversalFunction}.
- */
- public default <T> CompletableFuture<T> promise(final Function<Traversal, T> traversalFunction, final ExecutorService service) {
- final CompletableFuture<T> promise = new CompletableFuture<>();
- final Future iterationFuture = service.submit(() -> {
- try {
- promise.complete(traversalFunction.apply(this));
- } catch (Exception ex) {
- // the promise may have been cancelled by the caller, in which case, there is no need to attempt
- // another write on completion
- if (!promise.isDone()) promise.completeExceptionally(ex);
- }
- });
-
- // if the user cancels the promise then attempt to kill the iteration.
- promise.exceptionally(t -> {
- if (t instanceof CancellationException) {
- iterationFuture.cancel(true);
- }
-
- return null;
- });
-
- return promise;
+ // apply strategies to see if RemoteStrategy has any effect (i.e. add RemoteStep)
+ if (!this.asAdmin().isLocked()) this.asAdmin().applyStrategies();
+
+ // use the end step so the results are bulked
+ final Step<?, E> endStep = this.asAdmin().getEndStep();
+ if (endStep instanceof RemoteStep) {
+ return ((RemoteStep) endStep).promise().thenApply(traversalFunction);
+ } else {
+ throw new IllegalStateException("Only traversals created using withRemote() can be used in an async way");
+ }
}
/**
@@ -297,12 +276,6 @@ public interface Traversal<S, E> extends Iterator<E>, Serializable, Cloneable, A
public interface Admin<S, E> extends Traversal<S, E> {
/**
- * Service that handles promises.
- */
- static final ExecutorService traversalExecutorService = Executors.newCachedThreadPool(
- new BasicThreadFactory.Builder().namingPattern("traversal-executor-%d").build());
-
- /**
* Get the {@link Bytecode} associated with the construction of this traversal.
*
* @return the byte code representation of the traversal
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java
index 6ce6dfe..3c21e37 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java
@@ -43,9 +43,6 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.function.Function;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -328,40 +325,6 @@ public class DefaultTraversal<S, E> implements Traversal.Admin<S, E> {
this.graph = graph;
}
- /**
- * Override of {@link Traversal#promise(Function)} that is aware of graph transactions.
- */
- @Override
- public <T2> CompletableFuture<T2> promise(final Function<Traversal, T2> traversalFunction) {
- return this.promise(traversalFunction, Traversal.Admin.traversalExecutorService);
- }
-
- /**
- * Override of {@link Traversal#promise(Function)} that is aware of graph transactions. In a transactional graph
- * a promise represents the full scope of a transaction, even if the graph is only partially iterated.
- */
- @Override
- public <T2> CompletableFuture<T2> promise(final Function<Traversal, T2> traversalFunction, final ExecutorService service) {
- if (graph != null && graph.features().graph().supportsTransactions()) {
- final Function<Traversal, T2> transactionAware = traversal -> {
-
- try {
- if (graph.tx().isOpen()) graph.tx().rollback();
- final T2 obj = traversalFunction.apply(traversal);
- if (graph.tx().isOpen()) graph.tx().commit();
- return obj;
- } catch (Exception ex) {
- if (graph.tx().isOpen()) graph.tx().rollback();
- throw ex;
- }
- };
-
- return Traversal.Admin.super.promise(transactionAware, service);
- } else {
- return Traversal.Admin.super.promise(traversalFunction, service);
- }
- }
-
@Override
public boolean equals(final Object other) {
return other != null && other.getClass().equals(this.getClass()) && this.equals(((Traversal.Admin) other));
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalTest.java
index aa1b99b..c427d8e 100644
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalTest.java
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalTest.java
@@ -30,34 +30,22 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.NoSuchElementException;
import java.util.Optional;
-import java.util.Random;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsCollectionContaining.hasItems;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
/**
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
public class TraversalTest {
- private final ExecutorService service = Executors.newFixedThreadPool(2);
-
@Test
public void shouldTryNext() {
final MockTraversal<Integer> t = new MockTraversal<>(1, 2, 3);
@@ -117,139 +105,6 @@ public class TraversalTest {
}
@Test
- public void shouldPromiseNextThreeUsingForkJoin() throws Exception {
- final MockTraversal<Integer> t = new MockTraversal<>(1, 2, 3, 4, 5, 6, 7);
- final CompletableFuture<List<Integer>> promiseFirst = t.promise(traversal -> traversal.next(3));
- final List<Integer> listFirst = promiseFirst.get();
- assertEquals(3, listFirst.size());
- assertThat(listFirst, hasItems(1 ,2, 3));
- assertThat(t.hasNext(), is(true));
- assertThat(promiseFirst.isDone(), is(true));
-
- final CompletableFuture<List<Integer>> promiseSecond = t.promise(traversal -> traversal.next(3));
- final List<Integer> listSecond = promiseSecond.get();
- assertEquals(3, listSecond.size());
- assertThat(listSecond, hasItems(4, 5, 6));
- assertThat(t.hasNext(), is(true));
- assertThat(promiseSecond.isDone(), is(true));
-
- final CompletableFuture<List<Integer>> promiseThird = t.promise(traversal -> traversal.next(3));
- final List<Integer> listThird = promiseThird.get();
- assertEquals(1, listThird.size());
- assertThat(listThird, hasItems(7));
- assertThat(t.hasNext(), is(false));
- assertThat(promiseThird.isDone(), is(true));
-
- final CompletableFuture<Integer> promiseDead = t.promise(traversal -> (Integer) traversal.next());
- final AtomicBoolean dead = new AtomicBoolean(false);
- promiseDead.exceptionally(tossed -> {
- dead.set(tossed instanceof NoSuchElementException);
- return null;
- });
-
- try {
- promiseDead.get(10000, TimeUnit.MILLISECONDS);
- fail("Should have gotten an exception");
- } catch (Exception ex) {
- if (ex instanceof TimeoutException) {
- fail("This should not have timed out but should have gotten an exception caught above in the exceptionally() clause");
- }
-
- assertThat(ex.getCause(), instanceOf(NoSuchElementException.class));
- }
-
- assertThat(dead.get(), is(true));
- assertThat(t.hasNext(), is(false));
- assertThat(promiseDead.isDone(), is(true));
- }
-
- @Test
- public void shouldPromiseNextThreeUsingSpecificExecutor() throws Exception {
- final MockTraversal<Integer> t = new MockTraversal<>(1, 2, 3, 4, 5, 6, 7);
- final CompletableFuture<List<Integer>> promiseFirst = t.promise(traversal -> traversal.next(3), service);
- final List<Integer> listFirst = promiseFirst.get();
- assertEquals(3, listFirst.size());
- assertThat(listFirst, hasItems(1 ,2, 3));
- assertThat(t.hasNext(), is(true));
- assertThat(promiseFirst.isDone(), is(true));
-
- final CompletableFuture<List<Integer>> promiseSecond = t.promise(traversal -> traversal.next(3), service);
- final List<Integer> listSecond = promiseSecond.get();
- assertEquals(3, listSecond.size());
- assertThat(listSecond, hasItems(4, 5, 6));
- assertThat(t.hasNext(), is(true));
- assertThat(promiseSecond.isDone(), is(true));
-
- final CompletableFuture<List<Integer>> promiseThird = t.promise(traversal -> traversal.next(3), service);
- final List<Integer> listThird = promiseThird.get();
- assertEquals(1, listThird.size());
- assertThat(listThird, hasItems(7));
- assertThat(t.hasNext(), is(false));
- assertThat(promiseThird.isDone(), is(true));
-
- final CompletableFuture<Integer> promiseDead = t.promise(traversal -> (Integer) traversal.next(), service);
- final AtomicBoolean dead = new AtomicBoolean(false);
- promiseDead.exceptionally(tossed -> {
- dead.set(tossed instanceof NoSuchElementException);
- return null;
- });
-
- try {
- promiseDead.get(10000, TimeUnit.MILLISECONDS);
- fail("Should have gotten an exception");
- } catch (Exception ex) {
- if (ex instanceof TimeoutException) {
- fail("This should not have timed out but should have gotten an exception caught above in the exceptionally() clause");
- }
-
- assertThat(ex.getCause(), instanceOf(NoSuchElementException.class));
- }
-
- assertThat(dead.get(), is(true));
- assertThat(t.hasNext(), is(false));
- assertThat(promiseDead.isDone(), is(true));
- }
-
- @Test
- public void shouldInterruptTraversalFunction() throws Exception {
- final Random rand = new Random(1234567890);
-
- // infinite traversal
- final MockTraversal<Integer> t = new MockTraversal<>(IntStream.generate(rand::nextInt).iterator());
-
- // iterate a bunch of it
- final CompletableFuture<List<Integer>> promise10 = t.promise(traversal -> traversal.next(10), service);
- assertEquals(10, promise10.get(10000, TimeUnit.MILLISECONDS).size());
- final CompletableFuture<List<Integer>> promise100 = t.promise(traversal -> traversal.next(100), service);
- assertEquals(100, promise100.get(10000, TimeUnit.MILLISECONDS).size());
- final CompletableFuture<List<Integer>> promise1000 = t.promise(traversal -> traversal.next(1000), service);
- assertEquals(1000, promise1000.get(10000, TimeUnit.MILLISECONDS).size());
-
- // this is endless, so let's cancel
- final CompletableFuture<List<Integer>> promiseForevers = t.promise(traversal -> traversal.next(Integer.MAX_VALUE), service);
-
- // specify what to do on exception
- final AtomicBoolean failed = new AtomicBoolean(false);
- promiseForevers.exceptionally(ex -> {
- failed.set(true);
- return null;
- });
-
- try {
- // let it actually iterate a moment
- promiseForevers.get(500, TimeUnit.MILLISECONDS);
- fail("This should have timed out because the traversal has infinite items in it");
- } catch (TimeoutException tex) {
-
- }
-
- assertThat(promiseForevers.isDone(), is(false));
- promiseForevers.cancel(true);
- assertThat(failed.get(), is(true));
- assertThat(promiseForevers.isDone(), is(true));
- }
-
- @Test
public void shouldIterate() {
final MockTraversal<Integer> t = new MockTraversal<>(1, 2, 3, 4, 5, 6, 7);
assertThat(t.hasNext(), is(true));
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 972e838..9a2180e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -208,7 +208,7 @@ final class Connection {
logger.debug(String.format("Write on connection %s failed", thisConnection.getConnectionInfo()), f.cause());
thisConnection.isDead = true;
thisConnection.returnToPool();
- future.completeExceptionally(f.cause());
+ cluster.executor().submit(() -> future.completeExceptionally(f.cause()));
} else {
final LinkedBlockingQueue<Result> resultLinkedBlockingQueue = new LinkedBlockingQueue<>();
final CompletableFuture<Void> readCompleted = new CompletableFuture<>();
@@ -250,8 +250,8 @@ final class Connection {
final ResultQueue handler = new ResultQueue(resultLinkedBlockingQueue, readCompleted);
pending.put(requestMessage.getRequestId(), handler);
- future.complete(new ResultSet(handler, cluster.executor(), readCompleted,
- requestMessage, pool.host));
+ cluster.executor().submit(() -> future.complete(
+ new ResultSet(handler, cluster.executor(), readCompleted, requestMessage, pool.host)));
}
});
channel.writeAndFlush(requestMessage, requestPromise);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
index bb2d33d..be3fa28 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
@@ -37,6 +37,7 @@ import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import java.util.Iterator;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
/**
@@ -163,6 +164,10 @@ public class DriverRemoteConnection implements RemoteConnection {
}
}
+ /**
+ * @deprecated As of release 3.2.2, replaced by {@link #submit(Bytecode)}.
+ */
+ @Deprecated
@Override
public <E> Iterator<Traverser.Admin<E>> submit(final Traversal<?, E> t) throws RemoteConnectionException {
try {
@@ -189,6 +194,15 @@ public class DriverRemoteConnection implements RemoteConnection {
}
@Override
+ public <E> CompletableFuture<RemoteTraversal<?, E>> submitAsync(final Bytecode bytecode) throws RemoteConnectionException {
+ try {
+ return client.submitAsync(bytecode).thenApply(rs -> new DriverRemoteTraversal<>(rs, client, attachElements, conf));
+ } catch (Exception ex) {
+ throw new RemoteConnectionException(ex);
+ }
+ }
+
+ @Override
public void close() throws Exception {
try {
client.close();
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java
index 88ee794..d3f290c 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java
@@ -66,28 +66,18 @@ public class DriverRemoteTraversal<S, E> extends AbstractRemoteTraversal<S, E> {
}
this.rs = rs;
- this.sideEffects = new DriverRemoteTraversalSideEffects(
- client,
+ this.sideEffects = new DriverRemoteTraversalSideEffects(client,
rs.getOriginalRequestMessage().getRequestId(),
- rs.getHost());
+ rs.getHost(), rs.allItemsAvailableAsync());
}
/**
* Gets a side-effect from the server. Do not call this method prior to completing the iteration of the
- * {@link DriverRemoteTraversal} that spawned this as the side-effect will not be ready. If this method is called
- * prior to iteration being complete, then it will block until the traversal notifies it of completion. Generally
+ * {@link DriverRemoteTraversal} that spawned this as the side-effect will not be ready. Generally
* speaking, the common user would not get side-effects this way - they would use a call to {@code cap()}.
*/
@Override
public RemoteTraversalSideEffects getSideEffects() {
- // wait for the read to complete (i.e. iteration on the server) before allowing the caller to get the
- // side-effect. calling prior to this will result in the side-effect not being found. of course, the
- // bad part here is that the method blocks indefinitely waiting for the result, but it prevents the
- // test failure problems that happen on slower systems. in practice, it's unlikely that a user would
- // try to get a side-effect prior to iteration, but since the API allows it, this at least prevents
- // the error.
- rs.allItemsAvailableAsync().join();
-
return this.sideEffects;
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java
index 8d6fa98..4305567 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java
@@ -33,6 +33,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
/**
* Java driver implementation of {@link TraversalSideEffects}. This class is not thread safe.
@@ -50,15 +51,26 @@ public class DriverRemoteTraversalSideEffects extends AbstractRemoteTraversalSid
private boolean closed = false;
private boolean retrievedAllKeys = false;
+ private final CompletableFuture<Void> ready;
- public DriverRemoteTraversalSideEffects(final Client client, final UUID serverSideEffect, final Host host) {
+ public DriverRemoteTraversalSideEffects(final Client client, final UUID serverSideEffect, final Host host,
+ final CompletableFuture<Void> ready) {
this.client = client;
this.serverSideEffect = serverSideEffect;
this.host = host;
+ this.ready = ready;
}
@Override
public <V> V get(final String key) throws IllegalArgumentException {
+ // wait for the read to complete (i.e. iteration on the server) before allowing the caller to get the
+ // side-effect. calling prior to this will result in the side-effect not being found. of course, the
+ // bad part here is that the method blocks indefinitely waiting for the result, but it prevents the
+ // test failure problems that happen on slower systems. in practice, it's unlikely that a user would
+ // try to get a side-effect prior to iteration, but since the API allows it, this at least prevents
+ // the error.
+ ready.join();
+
if (!keys().contains(key)) throw TraversalSideEffects.Exceptions.sideEffectKeyDoesNotExist(key);
if (!sideEffects.containsKey(key)) {
@@ -91,6 +103,14 @@ public class DriverRemoteTraversalSideEffects extends AbstractRemoteTraversalSid
@Override
public Set<String> keys() {
+ // wait for the read to complete (i.e. iteration on the server) before allowing the caller to get the
+ // side-effect. calling prior to this will result in the side-effect not being found. of course, the
+ // bad part here is that the method blocks indefinitely waiting for the result, but it prevents the
+ // test failure problems that happen on slower systems. in practice, it's unlikely that a user would
+ // try to get a side-effect prior to iteration, but since the API allows it, this at least prevents
+ // the error.
+ ready.join();
+
if (closed && !retrievedAllKeys) throw new IllegalStateException("Traversal has been closed - side-effect keys cannot be retrieved");
if (!retrievedAllKeys) {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffectsTest.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffectsTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffectsTest.java
index 27d0079..4e6df93 100644
--- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffectsTest.java
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffectsTest.java
@@ -52,7 +52,9 @@ public class DriverRemoteTraversalSideEffectsTest extends AbstractResultQueueTes
mockClientForCall(client);
final UUID sideEffectKey = UUID.fromString("31dec2c6-b214-4a6f-a68b-996608dce0d9");
- final TraversalSideEffects sideEffects = new DriverRemoteTraversalSideEffects(client, sideEffectKey, null);
+ final CompletableFuture<Void> ready = new CompletableFuture<>();
+ ready.complete(null);
+ final TraversalSideEffects sideEffects = new DriverRemoteTraversalSideEffects(client, sideEffectKey, null, ready);
assertEquals(1, sideEffects.keys().size());
sideEffects.close();
@@ -73,7 +75,9 @@ public class DriverRemoteTraversalSideEffectsTest extends AbstractResultQueueTes
mockClientForCall(client);
mockClientForCall(client);
final UUID sideEffectKey = UUID.fromString("31dec2c6-b214-4a6f-a68b-996608dce0d9");
- final TraversalSideEffects sideEffects = new DriverRemoteTraversalSideEffects(client, sideEffectKey, null);
+ final CompletableFuture<Void> ready = new CompletableFuture<>();
+ ready.complete(null);
+ final TraversalSideEffects sideEffects = new DriverRemoteTraversalSideEffects(client, sideEffectKey, null, ready);
assertNotNull(sideEffects.get("test-0"));
sideEffects.close();
@@ -93,7 +97,9 @@ public class DriverRemoteTraversalSideEffectsTest extends AbstractResultQueueTes
mockClientForCall(client);
final UUID sideEffectKey = UUID.fromString("31dec2c6-b214-4a6f-a68b-996608dce0d9");
- final TraversalSideEffects sideEffects = new DriverRemoteTraversalSideEffects(client, sideEffectKey, null);
+ final CompletableFuture<Void> ready = new CompletableFuture<>();
+ ready.complete(null);
+ final TraversalSideEffects sideEffects = new DriverRemoteTraversalSideEffects(client, sideEffectKey, null, ready);
sideEffects.close();
sideEffects.close();
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-groovy/pom.xml
----------------------------------------------------------------------
diff --git a/gremlin-groovy/pom.xml b/gremlin-groovy/pom.xml
index dae5e8a..b82c986 100644
--- a/gremlin-groovy/pom.xml
+++ b/gremlin-groovy/pom.xml
@@ -65,6 +65,11 @@ limitations under the License.
<artifactId>jBCrypt</artifactId>
<version>jbcrypt-0.4</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.3.1</version>
+ </dependency>
<!-- TEST -->
<dependency>
<groupId>org.slf4j</groupId>
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
index b3dbe29..420bd05 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
@@ -46,12 +46,14 @@ import org.apache.tinkerpop.gremlin.groovy.jsr223.customizer.InterpreterModeCust
import org.apache.tinkerpop.gremlin.groovy.jsr223.customizer.SimpleSandboxExtension;
import org.apache.tinkerpop.gremlin.groovy.jsr223.customizer.TimedInterruptCustomizerProvider;
import org.apache.tinkerpop.gremlin.process.remote.RemoteGraph;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.server.channel.NioChannelizer;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
@@ -76,6 +78,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -990,4 +993,26 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
final BulkSet localBSideEffects = se.get("b");
assertThat(localBSideEffects.isEmpty(), is(false));
}
+
+ @Test
+ public void shouldDoNonBlockingPromiseWithRemote() throws Exception {
+ final Graph graph = EmptyGraph.instance();
+ final GraphTraversalSource g = graph.traversal().withRemote(conf);
+ g.addV("person").property("age", 20).promise(Traversal::iterate).join();
+ g.addV("person").property("age", 10).promise(Traversal::iterate).join();
+ assertEquals(50L, g.V().hasLabel("person").map(Lambda.function("it.get().value('age') + 10")).sum().promise(t -> t.next()).join());
+ g.addV("person").property("age", 20).promise(Traversal::iterate).join();
+
+ final Traversal traversal = g.V().hasLabel("person").has("age", 20).values("age");
+ assertEquals(20, traversal.promise(t -> ((Traversal) t).next(1).get(0)).join());
+ assertEquals(20, traversal.next());
+ assertThat(traversal.hasNext(), is(false));
+
+ final Traversal traversalCloned = g.V().hasLabel("person").has("age", 20).values("age");
+ assertEquals(20, traversalCloned.next());
+ assertEquals(20, traversalCloned.promise(t -> ((Traversal) t).next(1).get(0)).join());
+ assertThat(traversalCloned.promise(t -> ((Traversal) t).hasNext()).join(), is(false));
+
+ assertEquals(3, g.V().promise(Traversal::toList).join().size());
+ }
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
index 050f9de..68f8217 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java
@@ -20,7 +20,6 @@ package org.apache.tinkerpop.gremlin.process.traversal;
import org.apache.tinkerpop.gremlin.ExceptionCoverage;
import org.apache.tinkerpop.gremlin.FeatureRequirement;
-import org.apache.tinkerpop.gremlin.FeatureRequirementSet;
import org.apache.tinkerpop.gremlin.LoadGraphWith;
import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
@@ -41,9 +40,6 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
@@ -311,42 +307,4 @@ public class CoreTraversalTest extends AbstractGremlinProcessTest {
}
}
-
- @Test
- @FeatureRequirementSet(FeatureRequirementSet.Package.SIMPLE)
- public void shouldUsePromiseAndControlTransactionsIfAvailable() throws Exception {
- // this test will validate that transactional graphs can properly open/close transactions within a promise.
- // as there is a feature check, non-transactional graphs can use this to simply exercise the promise API
- final Vertex vAdded = g.addV("person").property("name", "stephen").promise(t -> (Vertex) t.next()).get(10000, TimeUnit.MILLISECONDS);
- final Vertex vRead = g.V().has("name", "stephen").next();
- assertEquals(vAdded.id(), vRead.id());
-
- // transaction should have been committed at this point so test the count in this thread to validate persistence
- assertVertexEdgeCounts(graph, 1, 0);
-
- // cancel a promise and ensure the transaction ended in failure. hold the traversal in park until it can be
- // interrupted, then the promise will have to rollback the transaction.
- final CompletableFuture promiseToCancel = g.addV("person").property("name", "marko").sideEffect(traverser -> {
- try {
- Thread.sleep(100000);
- } catch (Exception ignored) {
-
- }
- }).promise(t -> (Vertex) t.next());
-
- try {
- promiseToCancel.get(500, TimeUnit.MILLISECONDS);
- fail("Should have timed out");
- } catch (TimeoutException te) {
-
- }
-
- promiseToCancel.cancel(true);
-
- // graphs that support transactions will rollback the transaction
- if (graph.features().graph().supportsTransactions())
- assertVertexEdgeCounts(graph, 1, 0);
- else
- assertVertexEdgeCounts(graph, 2, 0);
- }
}