You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2016/08/05 20:11:17 UTC
[2/2] tinkerpop git commit: Major refactoring of TraversalOpProcessor
work
Major refactoring of TraversalOpProcessor work
Altered the RemoteConnection API (and related interfaces). Used a cache for sideeffects and protocol features that enable retrieval of them on demand.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/a6ab71be
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/a6ab71be
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/a6ab71be
Branch: refs/heads/TINKERPOP-1278
Commit: a6ab71be80f09c2400ec3bf4c6ec080638450c62
Parents: 13f2a14
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri Aug 5 16:02:15 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri Aug 5 16:02:15 2016 -0400
----------------------------------------------------------------------
.../process/remote/RemoteConnection.java | 9 +-
.../gremlin/process/remote/RemoteGraph.java | 4 +
.../gremlin/process/remote/RemoteResponse.java | 54 -------
.../traversal/AbstractRemoteTraversal.java | 117 ++++++++++++++
.../AbstractRemoteTraversalSideEffects.java | 113 +++++++++++++
.../remote/traversal/RemoteTraversal.java | 29 ++++
.../traversal/RemoteTraversalSideEffects.java | 27 ++++
.../remote/traversal/step/map/RemoteStep.java | 15 +-
.../apache/tinkerpop/gremlin/driver/Client.java | 9 ++
.../tinkerpop/gremlin/driver/Connection.java | 2 +-
.../tinkerpop/gremlin/driver/Handler.java | 12 +-
.../tinkerpop/gremlin/driver/ResultQueue.java | 73 +++------
.../tinkerpop/gremlin/driver/ResultSet.java | 18 +--
.../apache/tinkerpop/gremlin/driver/Tokens.java | 10 +-
.../gremlin/driver/message/RequestMessage.java | 8 +
.../driver/remote/DriverRemoteConnection.java | 10 +-
.../driver/remote/DriverRemoteResponse.java | 107 -------------
.../driver/remote/DriverRemoteTraversal.java | 114 +++++++++++++
.../DriverRemoteTraversalSideEffects.java | 82 ++++++++++
.../remote/DriverTraversalSideEffects.java | 150 -----------------
.../gremlin/driver/ResultQueueTest.java | 146 ++++++-----------
.../tinkerpop/gremlin/driver/ResultSetTest.java | 29 +---
gremlin-server/pom.xml | 5 +
.../op/traversal/TraversalOpProcessor.java | 160 +++++++++++++++++--
.../gremlin/server/util/SideEffectIterator.java | 94 +++++++++++
.../gremlin/server/util/TraversalIterator.java | 109 +------------
.../server/util/TraversalIteratorTest.java | 126 ---------------
27 files changed, 867 insertions(+), 765 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
index 870bed4..f2cc399 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
@@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.process.remote;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
@@ -28,7 +29,7 @@ import java.util.Iterator;
* A simple abstraction of a "connection" to a "server" that is capable of processing a {@link Traversal} and
* returning results. Results refer to both the {@link Iterator} of results from the submitted {@link Traversal}
* as well as the side-effects produced by that {@link Traversal}. Those results together are wrapped in a
- * {@link RemoteResponse}.
+ * {@link Traversal}.
*
* @author Stephen Mallette (http://stephen.genoprime.com)
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -42,9 +43,9 @@ public interface RemoteConnection extends AutoCloseable {
public <E> Iterator<Traverser.Admin<E>> submit(final Traversal<?, E> traversal) throws RemoteConnectionException;
/**
- * Submits {@link Traversal} {@link Bytecode} to a server and returns a {@link RemoteResponse}.
- * The {@link RemoteResponse} is an abstraction over two types of results that can be returned as part of the
+ * Submits {@link Traversal} {@link Bytecode} to a server and returns a {@link Traversal}.
+ * The {@link Traversal} is an abstraction over two types of results that can be returned as part of the
* response from the server: the results of the {@link Traversal} itself and the the side-effects that it produced.
*/
- public <E> RemoteResponse<E> submit(final Bytecode bytecode) throws RemoteConnectionException;
+ public <E> RemoteTraversal<?,E> submit(final Bytecode bytecode) throws RemoteConnectionException;
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteGraph.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteGraph.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteGraph.java
index c5c1cac..9852ed0 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteGraph.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteGraph.java
@@ -46,6 +46,10 @@ import java.util.Iterator;
@Graph.OptIn(Graph.OptIn.SUITE_PROCESS_STANDARD)
@Graph.OptIn(Graph.OptIn.SUITE_PROCESS_COMPUTER)
@Graph.OptOut(
+ test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupCountTest",
+ method = "g_V_hasXnoX_groupCountXaX_capXaX",
+ reason = "This test asserts an empty side-effect which reflects as a null rather than an \"empty\" and thus doens't assert")
+@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.process.traversal.step.branch.BranchTest",
method = "g_V_branchXlabelX_optionXperson__ageX_optionXsoftware__langX_optionXsoftware__nameX",
reason = "Issues with Longs")
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteResponse.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteResponse.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteResponse.java
deleted file mode 100644
index ee7a5fd..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteResponse.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.process.remote;
-
-import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraverser;
-import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalSideEffects;
-
-import java.util.Iterator;
-
-/**
- * The {@code RemoteResponse} is returned from {@link RemoteConnection#submit(Bytecode)} and provides implementers a
- * way to represent how they will return the results of a submitted {@link Traversal} and its side-effects. The
- * {@code RemoteResponse} is used internally by traversals spawned from a {@link RemoteGraph} to put remote results
- * into the streams of those traversals and to replace client-side {@link TraversalSideEffects} in those traversals.
- *
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-public interface RemoteResponse<E> {
-
- /**
- * Gets the list of results from a {@link Traversal} executed remotely. Implementers may push their results into
- * a {@link RemoteTraverser} instance to feed that {@code Iterator} or create their own implementation of it if
- * there is some advantage to doing so.
- */
- public Iterator<Traverser.Admin<E>> getTraversers();
-
- /**
- * Gets the side-effects (if any) from the remotely executed {@link Traversal}. Simple implementations could
- * likely use {@link DefaultTraversalSideEffects}, but more advanced implementations might look to lazily load
- * side-effects or otherwise implement some form of blocking to ensure that all side-effects are present from the
- * remote location.
- */
- public TraversalSideEffects getSideEffects();
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/AbstractRemoteTraversal.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/AbstractRemoteTraversal.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/AbstractRemoteTraversal.java
new file mode 100644
index 0000000..28eeae8
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/AbstractRemoteTraversal.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.remote.traversal;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+import org.apache.tinkerpop.gremlin.process.traversal.TraverserGenerator;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public abstract class AbstractRemoteTraversal<S,E> implements RemoteTraversal<S,E> {
+ @Override
+ public Bytecode getBytecode() {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public List<Step> getSteps() {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public <S2, E2> Admin<S2, E2> addStep(final int index, final Step<?, ?> step) throws IllegalStateException {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public <S2, E2> Admin<S2, E2> removeStep(final int index) throws IllegalStateException {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public void applyStrategies() throws IllegalStateException {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public TraverserGenerator getTraverserGenerator() {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public Set<TraverserRequirement> getTraverserRequirements() {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public void setSideEffects(final TraversalSideEffects sideEffects) {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public void setStrategies(final TraversalStrategies strategies) {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public TraversalStrategies getStrategies() {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public void setParent(final TraversalParent step) {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public TraversalParent getParent() {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public Admin<S, E> clone() {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public boolean isLocked() {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public Optional<Graph> getGraph() {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public void setGraph(final Graph graph) {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/AbstractRemoteTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/AbstractRemoteTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/AbstractRemoteTraversalSideEffects.java
new file mode 100644
index 0000000..9da0754
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/AbstractRemoteTraversalSideEffects.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.remote.traversal;
+
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+
+import java.util.Optional;
+import java.util.function.BinaryOperator;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+/**
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public abstract class AbstractRemoteTraversalSideEffects implements RemoteTraversalSideEffects {
+
+ @Override
+ public void set(final String key, final Object value) throws IllegalArgumentException {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public void remove(final String key) {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public <V> void register(final String key, final Supplier<V> initialValue, BinaryOperator<V> reducer) {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public <V> void registerIfAbsent(final String key, final Supplier<V> initialValue, BinaryOperator<V> reducer) {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public <V> BinaryOperator<V> getReducer(final String key) throws IllegalArgumentException {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public <V> Supplier<V> getSupplier(final String key) throws IllegalArgumentException {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public void add(final String key, final Object value) throws IllegalArgumentException {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public <S> void setSack(final Supplier<S> initialValue, UnaryOperator<S> splitOperator, final BinaryOperator<S> mergeOperator) {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public <S> Supplier<S> getSackInitialValue() {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public <S> UnaryOperator<S> getSackSplitter() {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public <S> BinaryOperator<S> getSackMerger() {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public TraversalSideEffects clone() {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public void mergeInto(final TraversalSideEffects sideEffects) {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public void registerSupplier(final String key, final Supplier supplier) {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key) {
+ throw new UnsupportedOperationException("Remote traversals do not support this method");
+ }
+
+ @Override
+ public String toString() {
+ return StringFactory.traversalSideEffectsString(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java
new file mode 100644
index 0000000..1f2ac74
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.remote.traversal;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+
+/**
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public interface RemoteTraversal<S,E> extends Traversal.Admin<S,E> {
+ @Override
+ public RemoteTraversalSideEffects getSideEffects();
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversalSideEffects.java
new file mode 100644
index 0000000..e41a42d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversalSideEffects.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.remote.traversal;
+
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+
+/**
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public interface RemoteTraversalSideEffects extends TraversalSideEffects {
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java
index 53b1378..387ef96 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java
@@ -20,14 +20,14 @@ package org.apache.tinkerpop.gremlin.process.remote.traversal.step.map;
import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection;
import org.apache.tinkerpop.gremlin.process.remote.RemoteConnectionException;
-import org.apache.tinkerpop.gremlin.process.remote.RemoteResponse;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraversal;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraverser;
import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-import java.util.Iterator;
import java.util.NoSuchElementException;
/**
@@ -39,7 +39,7 @@ import java.util.NoSuchElementException;
public final class RemoteStep<S, E> extends AbstractStep<S, E> {
private transient RemoteConnection remoteConnection;
- private Iterator<Traverser.Admin<E>> remoteIterator;
+ private RemoteTraversal<?,E> remoteTraversal;
private Bytecode bytecode;
@SuppressWarnings("unchecked")
@@ -57,16 +57,15 @@ public final class RemoteStep<S, E> extends AbstractStep<S, E> {
@SuppressWarnings("unchecked")
@Override
protected Traverser.Admin<E> processNextStart() throws NoSuchElementException {
- if (null == this.remoteIterator) {
+ if (null == this.remoteTraversal) {
try {
- final RemoteResponse remoteResponse = this.remoteConnection.submit(this.bytecode);
- this.remoteIterator = remoteResponse.getTraversers();
- this.traversal.setSideEffects(remoteResponse.getSideEffects());
+ remoteTraversal = this.remoteConnection.submit(this.bytecode);
+ this.traversal.setSideEffects(remoteTraversal.getSideEffects());
} catch (final RemoteConnectionException sce) {
throw new IllegalStateException(sce);
}
}
- return this.remoteIterator.next();
+ return (Traverser.Admin<E>) this.remoteTraversal.next();
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 3716411..5183493 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -556,6 +556,15 @@ public abstract class Client {
}
@Override
+ public CompletableFuture<ResultSet> submitAsync(final RequestMessage msg) {
+ final RequestMessage.Builder builder = RequestMessage.from(msg);
+ if (!aliases.isEmpty())
+ builder.addArg(Tokens.ARGS_ALIASES, aliases);
+
+ return super.submitAsync(builder.create());
+ }
+
+ @Override
public CompletableFuture<ResultSet> submitAsync(final Traversal traversal) {
return submitAsync(traversal.asAdmin().getBytecode());
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 22e48fe..1fb77f1 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -230,7 +230,7 @@ final class Connection {
final ResultQueue handler = new ResultQueue(resultLinkedBlockingQueue, readCompleted);
pending.put(requestMessage.getRequestId(), handler);
- future.complete(new ResultSet(handler, cluster.executor(), readCompleted));
+ future.complete(new ResultSet(handler, cluster.executor(), readCompleted, requestMessage));
}
});
channel.writeAndFlush(requestMessage, promise);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
index c9f838a..793f6a6 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
@@ -193,23 +193,22 @@ final class Handler {
if (data instanceof List) {
// unrolls the collection into individual results to be handled by the queue.
final List<Object> listToUnroll = (List<Object>) data;
- listToUnroll.forEach(item -> tryUnrollBulkedResult(queue, item));
+ listToUnroll.forEach(item -> tryUnrollTraverser(queue, item));
} else {
// since this is not a list it can just be added to the queue
- tryUnrollBulkedResult(queue, response.getResult().getData());
+ tryUnrollTraverser(queue, response.getResult().getData());
}
} else {
// this is the side-effect from the server which is generated from a serialized traversal
- final String sideEffectKey = meta.get(Tokens.ARGS_SIDE_EFFECT).toString();
final String aggregateTo = meta.getOrDefault(Tokens.ARGS_AGGREGATE_TO, Tokens.VAL_AGGREGATE_TO_NONE).toString();
if (data instanceof List) {
// unrolls the collection into individual results to be handled by the queue.
final List<Object> listOfSideEffects = (List<Object>) data;
- listOfSideEffects.forEach(sideEffect -> queue.addSideEffect(sideEffectKey, aggregateTo, sideEffect));
+ listOfSideEffects.forEach(sideEffect -> queue.addSideEffect(aggregateTo, sideEffect));
} else {
// since this is not a list it can just be added to the queue. this likely shouldn't occur
// however as the protocol will typically push everything to list first.
- queue.addSideEffect(sideEffectKey, aggregateTo, data);
+ queue.addSideEffect(aggregateTo, data);
}
}
} else {
@@ -228,10 +227,9 @@ final class Handler {
}
}
- private void tryUnrollBulkedResult(final ResultQueue queue, final Object item) {
+ private void tryUnrollTraverser(final ResultQueue queue, final Object item) {
if (unrollTraversers) {
if (item instanceof Traverser.Admin) {
- // TODO: i think this is just temporary code - needed for backward compatibility to the old way of serializing Traversal with java serialization
final Traverser.Admin t = (Traverser.Admin) item;
final Object result = t.get();
for (long ix = 0; ix < t.bulk(); ix++) {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
index 490b3f7..c499a20 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
@@ -27,11 +27,9 @@ import org.javatuples.Pair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -48,7 +46,7 @@ final class ResultQueue {
private final LinkedBlockingQueue<Result> resultLinkedBlockingQueue;
- private final Map<String, Object> sideEffectResult = new LinkedHashMap<>();
+ private Object aggregatedResult = null;
private final AtomicReference<Throwable> error = new AtomicReference<>();
@@ -76,79 +74,52 @@ final class ResultQueue {
* is only returned when a {@link Traversal} is submitted and refers to the side-effects defined in that traversal.
* A "script" will not return side-effects.
*
- * @param k the key of the side-effect
* @param aggregateTo the value of the {@link ResponseMessage} metadata for {@link Tokens#ARGS_AGGREGATE_TO}.
* @param sideEffectValue the value of the side-effect itself
*/
- public void addSideEffect(final String k, final String aggregateTo, final Object sideEffectValue) {
+ public void addSideEffect(final String aggregateTo, final Object sideEffectValue) {
switch (aggregateTo) {
case Tokens.VAL_AGGREGATE_TO_BULKSET:
if (!(sideEffectValue instanceof Traverser.Admin))
- throw new IllegalStateException(String.format("Side-effect \"%s\" value %s is a %s which does not aggregate to %s",
- k, sideEffectValue, sideEffectValue.getClass().getSimpleName(), aggregateTo));
+ throw new IllegalStateException(String.format("Side-effect value %s is a %s which does not aggregate to %s",
+ sideEffectValue, sideEffectValue.getClass().getSimpleName(), aggregateTo));
- if (!sideEffectResult.containsKey(k))
- putIfAbsent(k, new BulkSet());
+ if (null == aggregatedResult) aggregatedResult = new BulkSet();
- final BulkSet<Object> bs = validateAndGet(k, aggregateTo, BulkSet.class);
+ final BulkSet<Object> bs = validate(aggregateTo, BulkSet.class);
final Traverser.Admin traverser = (Traverser.Admin) sideEffectValue;
bs.add(traverser.get(), traverser.bulk());
break;
case Tokens.VAL_AGGREGATE_TO_LIST:
- if (!sideEffectResult.containsKey(k))
- putIfAbsent(k, new ArrayList());
-
- final List<Object> list = validateAndGet(k, aggregateTo, List.class);
+ if (null == aggregatedResult) aggregatedResult = new ArrayList();
+ final List<Object> list = validate(aggregateTo, List.class);
list.add(sideEffectValue);
break;
case Tokens.VAL_AGGREGATE_TO_MAP:
if (!(sideEffectValue instanceof Map.Entry))
- throw new IllegalStateException(String.format("Side-effect \"%s\" value %s is a %s which does not aggregate to %s",
- k, sideEffectValue, sideEffectValue.getClass().getSimpleName(), aggregateTo));
+ throw new IllegalStateException(String.format("Side-effect value %s is a %s which does not aggregate to %s",
+ sideEffectValue, sideEffectValue.getClass().getSimpleName(), aggregateTo));
- if (!sideEffectResult.containsKey(k))
- putIfAbsent(k, new HashMap());
+ if (null == aggregatedResult) aggregatedResult = new HashMap();
- final Map<Object,Object > m = validateAndGet(k, aggregateTo, Map.class);
+ final Map<Object,Object > m = validate(aggregateTo, Map.class);
final Map.Entry entry = (Map.Entry) sideEffectValue;
m.put(entry.getKey(), entry.getValue());
break;
case Tokens.VAL_AGGREGATE_TO_NONE:
- if (!sideEffectResult.containsKey(k))
- putIfAbsent(k, sideEffectValue);
+ if (null == aggregatedResult) aggregatedResult = sideEffectValue;
break;
default:
throw new IllegalStateException(String.format("%s is an invalid value for %s", aggregateTo, Tokens.ARGS_AGGREGATE_TO));
}
}
- private <V> V validateAndGet(final String k, final String aggregateTo, final Class<?> expected) {
- final Object shouldBe = sideEffectResult.get(k);
- if (!(expected.isAssignableFrom(shouldBe.getClass())))
+ private <V> V validate(final String aggregateTo, final Class<?> expected) {
+ if (!(expected.isAssignableFrom(aggregatedResult.getClass())))
throw new IllegalStateException(String.format("Side-effect \"%s\" contains the type %s that is not acceptable for %s",
- k, shouldBe.getClass().getSimpleName(), aggregateTo));
-
- return (V) shouldBe;
- }
-
- /**
- * Gets the keys gather for the side-effect. If the queue is still filling (i.e. the read is not complete) then
- * there could be inconsistent results depending on when this method is called. It would be best to wait to call
- * this method on {@link #readComplete}.
- */
- public Set<String> getSideEffectKeys() {
- return sideEffectResult.keySet();
- }
+ aggregatedResult.getClass().getSimpleName(), aggregateTo));
- /**
- * Gets the current values for the side-effect. If the queue is still filling (i.e. the read is not complete) then
- * there could be inconsistent results depending on when this method is called. It would be best to wait to call
- * this method on {@link #readComplete}.
- *
- * @param k the key of the side-effect
- */
- public <V> V getSideEffect(final String k) {
- return (V) sideEffectResult.get(k);
+ return (V) aggregatedResult;
}
public CompletableFuture<List<Result>> await(final int items) {
@@ -177,6 +148,12 @@ final class ResultQueue {
void markComplete() {
this.readComplete.complete(null);
+
+ // if there was some aggregation performed in the queue then the full object is hanging out waiting to be
+ // added to the ResultSet
+ if (aggregatedResult != null)
+ add(new Result(aggregatedResult));
+
this.drainAllWaiting();
}
@@ -186,10 +163,6 @@ final class ResultQueue {
this.drainAllWaiting();
}
- private synchronized void putIfAbsent(final String key, final Object o) {
- sideEffectResult.putIfAbsent(key, o);
- }
-
/**
* Completes the next waiting future if there is one.
*/
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
index 81fa49f..ed93fa3 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
@@ -18,11 +18,11 @@
*/
package org.apache.tinkerpop.gremlin.driver;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Spliterator;
import java.util.Spliterators;
@@ -50,24 +50,20 @@ import java.util.stream.StreamSupport;
public final class ResultSet implements Iterable<Result> {
private final ResultQueue resultQueue;
private final ExecutorService executor;
+ private final RequestMessage originalRequestMessage;
private final CompletableFuture<Void> readCompleted;
public ResultSet(final ResultQueue resultQueue, final ExecutorService executor,
- final CompletableFuture<Void> readCompleted) {
+ final CompletableFuture<Void> readCompleted, final RequestMessage originalRequestMessage) {
this.executor = executor;
this.resultQueue = resultQueue;
this.readCompleted = readCompleted;
+ this.originalRequestMessage = originalRequestMessage;
}
- public CompletableFuture<Map<String,Object>> getSideEffectResults() {
- final CompletableFuture<Map<String,Object>> future = new CompletableFuture<>();
- readCompleted.thenRunAsync(() -> {
- final Map<String,Object> se = new HashMap<>();
- resultQueue.getSideEffectKeys().forEach(k -> se.put(k, resultQueue.getSideEffect(k)));
- future.complete(se);
- }, executor);
- return future;
+ public RequestMessage getOriginalRequestMessage() {
+ return originalRequestMessage;
}
/**
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
index 00858ff..5542f60 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
@@ -33,6 +33,9 @@ public final class Tokens {
public static final String OPS_BYTECODE = "bytecode";
public static final String OPS_EVAL = "eval";
public static final String OPS_INVALID = "invalid";
+ public static final String OPS_GATHER = "gather";
+ public static final String OPS_KEYS = "keys";
+ public static final String OPS_CLOSE = "close";
/**
* @deprecated As for release 3.2.2, not replaced as this feature was never really published as official.
@@ -44,12 +47,6 @@ public final class Tokens {
* @deprecated As for release 3.2.2, not replaced as this feature was never really published as official.
*/
@Deprecated
- public static final String OPS_CLOSE = "close";
-
- /**
- * @deprecated As for release 3.2.2, not replaced as this feature was never really published as official.
- */
- @Deprecated
public static final String OPS_IMPORT = "import";
/**
@@ -82,6 +79,7 @@ public final class Tokens {
public static final String ARGS_SASL_MECHANISM = "saslMechanism";
public static final String ARGS_SIDE_EFFECT = "sideEffect";
public static final String ARGS_AGGREGATE_TO = "aggregateTo";
+ public static final String ARGS_SIDE_EFFECT_KEY = "sideEffectKey";
/**
* @deprecated As of release 3.1.0-incubating, replaced by {@link #ARGS_ALIASES}.
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/RequestMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/RequestMessage.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/RequestMessage.java
index 3b9a0a5..7a2ad3d 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/RequestMessage.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/RequestMessage.java
@@ -95,6 +95,14 @@ public final class RequestMessage {
return o == null ? Optional.empty() : Optional.of((T) o);
}
+ public static Builder from(final RequestMessage msg) {
+ final Builder builder = build(msg.op)
+ .overrideRequestId(msg.requestId)
+ .processor(msg.processor);
+ msg.args.forEach(builder::addArg);
+ return builder;
+ }
+
public static Builder build(final String op) {
return new Builder(op);
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
index 1cff037..d65354c 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
@@ -26,7 +26,7 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decorati
import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection;
import org.apache.tinkerpop.gremlin.process.remote.RemoteConnectionException;
import org.apache.tinkerpop.gremlin.process.remote.RemoteGraph;
-import org.apache.tinkerpop.gremlin.process.remote.RemoteResponse;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
@@ -167,9 +167,9 @@ public class DriverRemoteConnection implements RemoteConnection {
if (attachElements && !t.asAdmin().getStrategies().getStrategy(VertexProgramStrategy.class).isPresent()) {
if (!conf.isPresent()) throw new IllegalStateException("Traverser can't be reattached for testing");
final Graph graph = ((Supplier<Graph>) conf.get().getProperty("hidden.for.testing.only")).get();
- return new DriverRemoteResponse.AttachingTraverserIterator<>(client.submit(t.asAdmin().getBytecode()).iterator(), graph);
+ return new DriverRemoteTraversal.AttachingTraverserIterator<>(client.submit(t.asAdmin().getBytecode()).iterator(), graph);
} else {
- return new DriverRemoteResponse.TraverserIterator<>(client.submit(t.asAdmin().getBytecode()).iterator());
+ return new DriverRemoteTraversal.TraverserIterator<>(client.submit(t.asAdmin().getBytecode()).iterator());
}
} catch (Exception ex) {
throw new RemoteConnectionException(ex);
@@ -177,10 +177,10 @@ public class DriverRemoteConnection implements RemoteConnection {
}
@Override
- public <E> RemoteResponse<E> submit(final Bytecode bytecode) throws RemoteConnectionException {
+ public <E> RemoteTraversal<?,E> submit(final Bytecode bytecode) throws RemoteConnectionException {
try {
final ResultSet rs = client.submit(bytecode);
- return new DriverRemoteResponse<>(rs, attachElements, conf);
+ return new DriverRemoteTraversal<>(rs, client, attachElements, conf);
} catch (Exception ex) {
throw new RemoteConnectionException(ex);
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteResponse.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteResponse.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteResponse.java
deleted file mode 100644
index 75a4e26..0000000
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteResponse.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.driver.remote;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.driver.Result;
-import org.apache.tinkerpop.gremlin.driver.ResultSet;
-import org.apache.tinkerpop.gremlin.process.remote.RemoteResponse;
-import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraverser;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.structure.Element;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Property;
-import org.apache.tinkerpop.gremlin.structure.util.Attachable;
-
-import java.util.Iterator;
-import java.util.Optional;
-import java.util.function.Supplier;
-
-/**
- * A {@link RemoteResponse} implementation for the Gremlin Driver.
- *
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-public class DriverRemoteResponse<E> implements RemoteResponse<E> {
-
- private final Iterator<Traverser.Admin<E>> resultIterator;
- private final TraversalSideEffects sideEffects;
-
- public DriverRemoteResponse(final ResultSet rs, final boolean attach, Optional<Configuration> conf) {
- // attaching is really just for testing purposes. it doesn't make sense in any real-world scenario as it would
- // require that the client have access to the Graph instance that produced the result. tests need that
- // attachment process to properly execute in full hence this little hack.
- if (attach) {
- if (!conf.isPresent()) throw new IllegalStateException("Traverser can't be reattached for testing");
- final Graph graph = ((Supplier<Graph>) conf.get().getProperty("hidden.for.testing.only")).get();
- resultIterator = new AttachingTraverserIterator<>(rs.iterator(), graph);
- } else {
- resultIterator = new TraverserIterator<>(rs.iterator());
- }
-
- sideEffects = new DriverTraversalSideEffects(rs);
- }
-
- @Override
- public Iterator<Traverser.Admin<E>> getTraversers() {
- return resultIterator;
- }
-
- @Override
- public TraversalSideEffects getSideEffects() {
- return sideEffects;
- }
-
- static class TraverserIterator<E> implements Iterator<Traverser.Admin<E>> {
-
- private final Iterator<Result> inner;
-
- public TraverserIterator(final Iterator<Result> resultIterator) {
- inner = resultIterator;
- }
-
- @Override
- public boolean hasNext() {
- return inner.hasNext();
- }
-
- @Override
- public Traverser.Admin<E> next() {
- return (RemoteTraverser<E>) inner.next().getObject();
- }
- }
-
- static class AttachingTraverserIterator<E> extends TraverserIterator<E> {
- private final Graph graph;
-
- public AttachingTraverserIterator(final Iterator<Result> resultIterator, final Graph graph) {
- super(resultIterator);
- this.graph = graph;
- }
-
- @Override
- public Traverser.Admin<E> next() {
- final Traverser.Admin<E> traverser = super.next();
- if (traverser.get() instanceof Attachable && !(traverser.get() instanceof Property))
- traverser.set((E) ((Attachable<Element>) traverser.get()).attach(Attachable.Method.get(graph)));
- return traverser;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java
new file mode 100644
index 0000000..e3a8612
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.remote;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.driver.Client;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.AbstractRemoteTraversal;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraverser;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Property;
+import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * A {@link AbstractRemoteTraversal} implementation for the Gremlin Driver.
+ *
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public class DriverRemoteTraversal<S,E> extends AbstractRemoteTraversal<S,E> {
+
+ private final Iterator<Traverser.Admin<E>> resultIterator;
+ private final RemoteTraversalSideEffects sideEffects;
+
+ public DriverRemoteTraversal(final ResultSet rs, final Client client, final boolean attach, final Optional<Configuration> conf) {
+ // attaching is really just for testing purposes. it doesn't make sense in any real-world scenario as it would
+ // require that the client have access to the Graph instance that produced the result. tests need that
+ // attachment process to properly execute in full hence this little hack.
+ if (attach) {
+ if (!conf.isPresent()) throw new IllegalStateException("Traverser can't be reattached for testing");
+ final Graph graph = ((Supplier<Graph>) conf.get().getProperty("hidden.for.testing.only")).get();
+ resultIterator = new AttachingTraverserIterator<>(rs.iterator(), graph);
+ } else {
+ resultIterator = new TraverserIterator<>(rs.iterator());
+ }
+
+ sideEffects = new DriverRemoteTraversalSideEffects(client, rs.getOriginalRequestMessage().getRequestId());
+ }
+
+ @Override
+ public RemoteTraversalSideEffects getSideEffects() {
+ return sideEffects;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return resultIterator.hasNext();
+ }
+
+ @Override
+ public E next() {
+ return (E) resultIterator.next();
+ }
+
+ static class TraverserIterator<E> implements Iterator<Traverser.Admin<E>> {
+
+ private final Iterator<Result> inner;
+
+ public TraverserIterator(final Iterator<Result> resultIterator) {
+ inner = resultIterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return inner.hasNext();
+ }
+
+ @Override
+ public Traverser.Admin<E> next() {
+ return (RemoteTraverser<E>) inner.next().getObject();
+ }
+ }
+
+ static class AttachingTraverserIterator<E> extends TraverserIterator<E> {
+ private final Graph graph;
+
+ public AttachingTraverserIterator(final Iterator<Result> resultIterator, final Graph graph) {
+ super(resultIterator);
+ this.graph = graph;
+ }
+
+ @Override
+ public Traverser.Admin<E> next() {
+ final Traverser.Admin<E> traverser = super.next();
+ if (traverser.get() instanceof Attachable && !(traverser.get() instanceof Property))
+ traverser.set((E) ((Attachable<Element>) traverser.get()).attach(Attachable.Method.get(graph)));
+ return traverser;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java
new file mode 100644
index 0000000..c3c75f7
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.remote;
+
+import org.apache.tinkerpop.gremlin.driver.Client;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.Tokens;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.AbstractRemoteTraversalSideEffects;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public class DriverRemoteTraversalSideEffects extends AbstractRemoteTraversalSideEffects {
+
+ private final Client client;
+ private Set<String> keys = null;
+ private final UUID serverSideEffect;
+
+ private final Map<String, Object> sideEffects = new HashMap<>();
+
+ public DriverRemoteTraversalSideEffects(final Client client, final UUID serverSideEffect) {
+ this.client = client;
+ this.serverSideEffect = serverSideEffect;
+ }
+
+ @Override
+ public <V> V get(final String key) throws IllegalArgumentException {
+ if (!sideEffects.containsKey(key)) {
+ final RequestMessage msg = RequestMessage.build(Tokens.OPS_GATHER)
+ .addArg(Tokens.ARGS_SIDE_EFFECT, serverSideEffect)
+ .addArg(Tokens.ARGS_SIDE_EFFECT_KEY, key)
+ .processor("traversal").create();
+ try {
+ final Result result = client.submitAsync(msg).get().one();
+ sideEffects.put(key, null == result ? null : result.getObject());
+ } catch (Exception ex) {
+ throw new RuntimeException("Could not get cache value", ex);
+ }
+ }
+
+ return (V) sideEffects.get(key);
+ }
+
+ @Override
+ public Set<String> keys() {
+ if (null == keys) {
+ final RequestMessage msg = RequestMessage.build(Tokens.OPS_KEYS)
+ .addArg(Tokens.ARGS_SIDE_EFFECT, serverSideEffect)
+ .processor("traversal").create();
+ try {
+ keys = client.submitAsync(msg).get().all().get().stream().map(r -> r.getString()).collect(Collectors.toSet());
+ } catch (Exception ex) {
+ throw new RuntimeException("Could not get keys", ex);
+ }
+ }
+
+ return keys;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverTraversalSideEffects.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverTraversalSideEffects.java
deleted file mode 100644
index 6996df4..0000000
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverTraversalSideEffects.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.driver.remote;
-
-import org.apache.tinkerpop.gremlin.driver.ResultSet;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
-import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.BinaryOperator;
-import java.util.function.Supplier;
-import java.util.function.UnaryOperator;
-
-/**
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-public class DriverTraversalSideEffects implements TraversalSideEffects {
-
- private final ResultSet rs;
- private CompletableFuture<Map<String, Object>> future = null;
-
- public DriverTraversalSideEffects(final ResultSet rs) {
- this.rs = rs;
- }
-
- @Override
- public <V> V get(final String key) throws IllegalArgumentException {
- initializeFuture();
- try {
- return (V) future.get().get(key);
- } catch (Exception ex) {
- // TODO: PREEEEEEEEEEETTTTTY DUMPY
- throw new RuntimeException(ex);
- }
- }
-
- @Override
- public void set(String key, Object value) throws IllegalArgumentException {
-
- }
-
- @Override
- public void remove(String key) {
-
- }
-
- @Override
- public Set<String> keys() {
- initializeFuture();
- try {
- return future.get().keySet();
- } catch (Exception ex) {
- // TODO: PREEEEEEEEEEETTTTTY DUMPY
- throw new RuntimeException(ex);
- }
- }
-
- @Override
- public <V> void register(String key, Supplier<V> initialValue, BinaryOperator<V> reducer) {
-
- }
-
- @Override
- public <V> void registerIfAbsent(String key, Supplier<V> initialValue, BinaryOperator<V> reducer) {
-
- }
-
- @Override
- public <V> BinaryOperator<V> getReducer(String key) throws IllegalArgumentException {
- return null;
- }
-
- @Override
- public <V> Supplier<V> getSupplier(String key) throws IllegalArgumentException {
- return null;
- }
-
- @Override
- public void add(String key, Object value) throws IllegalArgumentException {
-
- }
-
- @Override
- public <S> void setSack(Supplier<S> initialValue, UnaryOperator<S> splitOperator, BinaryOperator<S> mergeOperator) {
-
- }
-
- @Override
- public <S> Supplier<S> getSackInitialValue() {
- return null;
- }
-
- @Override
- public <S> UnaryOperator<S> getSackSplitter() {
- return null;
- }
-
- @Override
- public <S> BinaryOperator<S> getSackMerger() {
- return null;
- }
-
- @Override
- public TraversalSideEffects clone() {
- return null;
- }
-
- @Override
- public void mergeInto(TraversalSideEffects sideEffects) {
-
- }
-
- @Override
- public void registerSupplier(String key, Supplier supplier) {
-
- }
-
- @Override
- public <V> Optional<Supplier<V>> getRegisteredSupplier(String key) {
- return null;
- }
-
- @Override
- public String toString() {
- return StringFactory.traversalSideEffectsString(this);
- }
-
- private synchronized void initializeFuture() {
- if (null == future) future = rs.getSideEffectResults();
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java
index 7523e79..40adc27 100644
--- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java
@@ -296,85 +296,54 @@ public class ResultQueueTest extends AbstractResultQueueTest {
}
@Test
- public void shouldHandleBulkSetSideEffects() {
- assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
+ public void shouldHandleBulkSetSideEffects() throws Exception {
+ final CompletableFuture<List<Result>> o = resultQueue.await(1);
+ assertThat(o.isDone(), is(false));
- resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_BULKSET, new RemoteTraverser<>("stephen", 1));
- assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
- assertEquals(1, ((BulkSet) resultQueue.getSideEffect("a")).get("stephen"));
+ resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_BULKSET, new RemoteTraverser<>("brian", 2));
+ assertThat(o.isDone(), is(false));
- resultQueue.addSideEffect("b", Tokens.VAL_AGGREGATE_TO_BULKSET, new RemoteTraverser<>("brian", 2));
- assertThat(resultQueue.getSideEffectKeys(), hasItem("b"));
- assertEquals(2, ((BulkSet) resultQueue.getSideEffect("b")).get("brian"));
+ resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_BULKSET, new RemoteTraverser<>("brian", 2));
+ assertThat(o.isDone(), is(false));
- resultQueue.addSideEffect("b", Tokens.VAL_AGGREGATE_TO_BULKSET, new RemoteTraverser<>("brian", 2));
- assertThat(resultQueue.getSideEffectKeys(), hasItem("b"));
- assertEquals(4, ((BulkSet) resultQueue.getSideEffect("b")).get("brian"));
+ resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_BULKSET, new RemoteTraverser<>("belinda", 6));
+ assertThat(o.isDone(), is(false));
- resultQueue.addSideEffect("b", Tokens.VAL_AGGREGATE_TO_BULKSET, new RemoteTraverser<>("belinda", 6));
- assertThat(resultQueue.getSideEffectKeys(), hasItem("b"));
- assertEquals(6, ((BulkSet) resultQueue.getSideEffect("b")).get("belinda"));
+ resultQueue.markComplete();
+ assertThat(o.isDone(), is(true));
+ final BulkSet<String> bulkSet = o.get().get(0).get(BulkSet.class);
+ assertEquals(4, bulkSet.get("brian"));
+ assertEquals(6, bulkSet.get("belinda"));
}
@Test
- public void shouldNotMixAggregatesForBulkSet() {
- assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
+ public void shouldHandleListSideEffects() throws Exception {
+ final CompletableFuture<List<Result>> o = resultQueue.await(1);
+ assertThat(o.isDone(), is(false));
- resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_BULKSET, new RemoteTraverser<>("stephen", 1));
- assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
- assertEquals(1, ((BulkSet) resultQueue.getSideEffect("a")).get("stephen"));
+ resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_LIST, "stephen");
+ assertThat(o.isDone(), is(false));
- try {
- resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_BULKSET, Arrays.asList("stephen", "kathy", "alice"));
- } catch (Exception ex) {
- assertThat(ex, instanceOf(IllegalStateException.class));
- assertEquals("Side-effect \"a\" value [stephen, kathy, alice] is a ArrayList which does not aggregate to bulkset", ex.getMessage());
- }
- }
-
- @Test
- public void shouldHandleListSideEffects() {
- assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
-
- resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_LIST, "stephen");
- assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
- List<String> l = resultQueue.getSideEffect("a");
- assertEquals(1, l.size());
- assertEquals("stephen", l.get(0));
-
- resultQueue.addSideEffect("d", Tokens.VAL_AGGREGATE_TO_LIST, "daniel");
- assertThat(resultQueue.getSideEffectKeys(), hasItem("d"));
- l = resultQueue.getSideEffect("d");
- assertEquals(1, l.size());
- assertEquals("daniel", l.get(0));
-
- resultQueue.addSideEffect("d", Tokens.VAL_AGGREGATE_TO_LIST, "dave");
- assertThat(resultQueue.getSideEffectKeys(), hasItem("d"));
- l = resultQueue.getSideEffect("d");
- assertEquals(2, l.size());
- assertThat(l, contains("daniel","dave"));
- }
+ resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_LIST, "daniel");
+ assertThat(o.isDone(), is(false));
- @Test
- public void shouldNotMixAggregatesForList() {
- assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
+ resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_LIST, "dave");
+ assertThat(o.isDone(), is(false));
- resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_BULKSET, new RemoteTraverser<>("stephen", 1));
- assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
- assertEquals(1, ((BulkSet) resultQueue.getSideEffect("a")).get("stephen"));
+ resultQueue.markComplete();
- try {
- resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_LIST, Arrays.asList("stephen", "kathy", "alice"));
- } catch (Exception ex) {
- assertThat(ex, instanceOf(IllegalStateException.class));
- assertEquals("Side-effect \"a\" contains the type BulkSet that is not acceptable for list", ex.getMessage());
- }
+ assertThat(o.isDone(), is(true));
+ final List<String> list = o.get().get(0).get(ArrayList.class);
+ assertEquals("stephen", list.get(0));
+ assertEquals("daniel", list.get(1));
+ assertEquals("dave", list.get(2));
}
@Test
- public void shouldHandleMapSideEffects() {
- assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
+ public void shouldHandleMapSideEffects() throws Exception {
+ final CompletableFuture<List<Result>> o = resultQueue.await(1);
+ assertThat(o.isDone(), is(false));
final Map<String,String> m = new HashMap<>();
m.put("s", "stephen");
@@ -382,47 +351,38 @@ public class ResultQueueTest extends AbstractResultQueueTest {
m.put("d", "daniel");
m.entrySet().forEach(e -> {
- resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_MAP, e);
- assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
- assertEquals(e.getValue(), ((Map) resultQueue.getSideEffect("a")).get(e.getKey()));
+ resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_MAP, e);
+ assertThat(o.isDone(), is(false));
});
- assertEquals(3, ((Map) resultQueue.getSideEffect("a")).size());
- }
-
- @Test
- public void shouldNotMixAggregatesForMap() {
- assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
-
- final Map<String,String> m = new HashMap<>();
- m.put("s", "stephen");
-
- resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_MAP, m.entrySet().iterator().next());
- assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
- assertEquals("stephen", ((Map) resultQueue.getSideEffect("a")).get("s"));
+ resultQueue.markComplete();
- try {
- resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_MAP, Arrays.asList("stephen", "kathy", "alice"));
- } catch (Exception ex) {
- assertThat(ex, instanceOf(IllegalStateException.class));
- assertEquals("Side-effect \"a\" value [stephen, kathy, alice] is a ArrayList which does not aggregate to map", ex.getMessage());
- }
+ assertThat(o.isDone(), is(true));
+ final Map<String, String> list = o.get().get(0).get(HashMap.class);
+ assertEquals("stephen", list.get("s"));
+ assertEquals("daniel", list.get("d"));
+ assertEquals("marko", list.get("m"));
}
+
@Test
- public void shouldHandleNotAggregateSideEffects() {
- assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
+ public void shouldHandleNotAggregateSideEffects() throws Exception {
+ final CompletableFuture<List<Result>> o = resultQueue.await(1);
+ assertThat(o.isDone(), is(false));
final Map<String,String> m = new HashMap<>();
m.put("s", "stephen");
m.put("m", "marko");
m.put("d", "daniel");
- resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_NONE, m);
- assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
- assertEquals("stephen", ((Map) resultQueue.getSideEffect("a")).get("s"));
- assertEquals("marko", ((Map) resultQueue.getSideEffect("a")).get("m"));
- assertEquals("daniel", ((Map) resultQueue.getSideEffect("a")).get("d"));
- assertEquals(3, ((Map) resultQueue.getSideEffect("a")).size());
+ resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_NONE, m);
+
+ resultQueue.markComplete();
+
+ assertThat(o.isDone(), is(true));
+ final Map<String, String> list = o.get().get(0).get(HashMap.class);
+ assertEquals("stephen", list.get("s"));
+ assertEquals("daniel", list.get("d"));
+ assertEquals("marko", list.get("m"));
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
index f5588ab..768ecc1 100644
--- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.driver;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.junit.Before;
import org.junit.Test;
@@ -44,7 +45,7 @@ public class ResultSetTest extends AbstractResultQueueTest {
@Before
public void setupThis() {
- resultSet = new ResultSet(resultQueue, pool, readCompleted);
+ resultSet = new ResultSet(resultQueue, pool, readCompleted, RequestMessage.build("traversal").create());
}
@Test
@@ -194,30 +195,4 @@ public class ResultSetTest extends AbstractResultQueueTest {
assertEquals(100, counter.get());
}
-
- @Test
- public void shouldRetrieveSideEffects() throws Exception {
- final Iterator itty = resultSet.iterator();
- final CompletableFuture<Map<String,Object>> sideEffects = resultSet.getSideEffectResults();
-
- assertThat(sideEffects.isDone(), is(false));
-
- // queue is not marked finished so the side effect future is still not complete
- addToQueue(100, 1, true, false);
-
- for (int i = 0; i < 101; i++) {
- assertThat(itty.hasNext(), is(true));
- }
-
- // now complete the queue
- addToQueue(0, 1, true, true, 0);
-
- // addToQueue doesn't block for "read complete" so gotta spin the thread
- while (!readCompleted.isDone()) {
- Thread.sleep(10);
- }
-
- // side effects are empty in this case, but that's fine for the purpose of this test
- assertThat(sideEffects.isDone(), is(true));
- }
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-server/pom.xml
----------------------------------------------------------------------
diff --git a/gremlin-server/pom.xml b/gremlin-server/pom.xml
index 2e7fc35..971baca 100644
--- a/gremlin-server/pom.xml
+++ b/gremlin-server/pom.xml
@@ -45,6 +45,11 @@ limitations under the License.
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <version>2.3.1</version>
+ </dependency>
<!-- METRICS -->
<dependency>
<groupId>com.codahale.metrics</groupId>