You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2016/08/28 01:15:02 UTC
[2/2] tinkerpop git commit: Added a way to extend the "end" behavior
of result iteration.
Added a way to extend the "end" behavior of result iteration.
There seemed to be some kind of race condition where the side-effect cache would not get initialized in between the final message being sent back to the client and the request for the side-effect. Introducing this extension allows TraversalOpProcessor to add the traversal sideeffect to the cache just prior to flush of the last messge. CTR
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/e31a2cb4
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/e31a2cb4
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/e31a2cb4
Branch: refs/heads/master
Commit: e31a2cb4644d3babb4ae58387175e7c3905dc321
Parents: 4f6e769
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Sat Aug 27 21:12:50 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Sat Aug 27 21:12:50 2016 -0400
----------------------------------------------------------------------
.../gremlin/server/op/AbstractOpProcessor.java | 11 +++++++++++
.../server/op/traversal/TraversalOpProcessor.java | 12 ++++++++----
.../gremlin/server/util/TraversalIterator.java | 6 ++++++
3 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e31a2cb4/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
index a2598ef..d1d1dcb 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
@@ -179,6 +179,8 @@ public abstract class AbstractOpProcessor implements OpProcessor {
throw ex;
}
+ iterateComplete(ctx, msg, itty);
+
// the flush is called after the commit has potentially occurred. in this way, if a commit was
// required then it will be 100% complete before the client receives it. the "frame" at this point
// should have completely detached objects from the transaction (i.e. serialization has occurred)
@@ -211,6 +213,15 @@ public abstract class AbstractOpProcessor implements OpProcessor {
}
/**
+ * Called when iteration within {@link #handleIterator(Context, Iterator)} is on its final pass and the final
+ * frame is about to be sent back to the client. This method only gets called on successful iteration of the
+ * entire result.
+ */
+ protected void iterateComplete(final ChannelHandlerContext ctx, final RequestMessage msg, final Iterator itty) {
+ // do nothing by default
+ }
+
+ /**
* Determines if a {@link Frame} should be force flushed outside of the {@code resultIterationBatchSize} and the
* termination of the iterator. By default this method return {@code false}.
*
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e31a2cb4/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
index 2acea4a..beca097 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
@@ -107,7 +107,7 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
private static Cache<UUID, TraversalSideEffects> cache = null;
public TraversalOpProcessor() {
- super(true);
+ super(false);
}
@Override
@@ -367,9 +367,6 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
// compile the traversal - without it getEndStep() has nothing in it
traversal.applyStrategies();
handleIterator(context, new TraversalIterator(traversal));
-
- if (!traversal.getSideEffects().isEmpty())
- cache.put(msg.getRequestId(), traversal.getSideEffects());
} catch (TimeoutException ex) {
final String errorMessage = String.format("Response iteration exceeded the configured threshold for request [%s] - %s", msg.getRequestId(), ex.getMessage());
logger.warn(errorMessage);
@@ -400,6 +397,13 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
}
}
+ @Override
+ protected void iterateComplete(final ChannelHandlerContext ctx, final RequestMessage msg, final Iterator itty) {
+ final Traversal.Admin traversal = ((TraversalIterator) itty).getTraversal();
+ if (!traversal.getSideEffects().isEmpty())
+ cache.put(msg.getRequestId(), traversal.getSideEffects());
+ }
+
protected void beforeProcessing(final Graph graph, final Context ctx) {
if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback();
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e31a2cb4/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/TraversalIterator.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/TraversalIterator.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/TraversalIterator.java
index 4b77ac0..041dc0b 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/TraversalIterator.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/TraversalIterator.java
@@ -30,10 +30,12 @@ import java.util.Iterator;
*/
public class TraversalIterator implements Iterator<Object> {
+ private final Traversal.Admin traversal;
private final Iterator<Object> traversalIterator;
private final HaltedTraverserStrategy haltedTraverserStrategy;
public TraversalIterator(final Traversal.Admin traversal) {
+ this.traversal = traversal;
this.traversalIterator = traversal.getEndStep();
this.haltedTraverserStrategy = traversal.getStrategies().getStrategy(HaltedTraverserStrategy.class).orElse(
Boolean.valueOf(System.getProperty("is.testing", "false")) ?
@@ -41,6 +43,10 @@ public class TraversalIterator implements Iterator<Object> {
HaltedTraverserStrategy.reference());
}
+ public Traversal.Admin getTraversal() {
+ return traversal;
+ }
+
@Override
public boolean hasNext() {
return this.traversalIterator.hasNext();