You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2016/08/05 20:11:16 UTC

[1/2] tinkerpop git commit: Major refactoring of TraversalOpProcessor work

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1278 13f2a1423 -> a6ab71be8


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
index bfcd014..914741f 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
@@ -19,6 +19,8 @@
 package org.apache.tinkerpop.gremlin.server.op.traversal;
 
 import com.codahale.metrics.Timer;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.tinkerpop.gremlin.driver.Tokens;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
@@ -27,6 +29,7 @@ import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
 import org.apache.tinkerpop.gremlin.groovy.engine.ScriptEngines;
 import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
 import org.apache.tinkerpop.gremlin.server.Context;
 import org.apache.tinkerpop.gremlin.server.GraphManager;
@@ -35,9 +38,10 @@ import org.apache.tinkerpop.gremlin.server.OpProcessor;
 import org.apache.tinkerpop.gremlin.server.op.AbstractOpProcessor;
 import org.apache.tinkerpop.gremlin.server.op.OpProcessorException;
 import org.apache.tinkerpop.gremlin.server.util.MetricManager;
+import org.apache.tinkerpop.gremlin.server.util.SideEffectIterator;
 import org.apache.tinkerpop.gremlin.server.util.TraversalIterator;
 import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.util.function.ThrowingConsumer;;
+import org.apache.tinkerpop.gremlin.util.function.ThrowingConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +51,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static com.codahale.metrics.MetricRegistry.name;
@@ -61,6 +67,11 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
     public static final String OP_PROCESSOR_NAME = "traversal";
     public static final Timer traversalOpTimer = MetricManager.INSTANCE.getTimer(name(GremlinServer.class, "op", "traversal"));
 
+    private static final Cache<UUID, TraversalSideEffects> cache = Caffeine.newBuilder()
+            .expireAfterWrite(10, TimeUnit.MINUTES)
+            .maximumSize(10_000)
+            .build();
+
     public TraversalOpProcessor() {
         super(true);
     }
@@ -92,6 +103,65 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
 
                 op = this::iterateBytecodeTraversal;
                 break;
+            case Tokens.OPS_GATHER:
+                final Optional<String> sideEffectForGather = message.optionalArgs(Tokens.ARGS_SIDE_EFFECT);
+                if (!sideEffectForGather.isPresent()) {
+                    final String msg = String.format("A message with an [%s] op code requires a [%s] argument.", Tokens.OPS_GATHER, Tokens.ARGS_SIDE_EFFECT);
+                    throw new OpProcessorException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+                }
+
+                final Optional<String> sideEffectKey = message.optionalArgs(Tokens.ARGS_SIDE_EFFECT_KEY);
+                if (!sideEffectKey.isPresent()) {
+                    final String msg = String.format("A message with an [%s] op code requires a [%s] argument.", Tokens.OPS_GATHER, Tokens.ARGS_SIDE_EFFECT_KEY);
+                    throw new OpProcessorException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+                }
+
+                validatedAliases(message);
+
+                op = this::gatherSideEffect;
+
+                break;
+            case Tokens.OPS_KEYS:
+                final Optional<String> sideEffectForKeys = message.optionalArgs(Tokens.ARGS_SIDE_EFFECT);
+                if (!sideEffectForKeys.isPresent()) {
+                    final String msg = String.format("A message with an [%s] op code requires a [%s] argument.", Tokens.OPS_GATHER, Tokens.ARGS_SIDE_EFFECT);
+                    throw new OpProcessorException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+                }
+
+                validatedAliases(message);
+
+                op = context -> {
+                    final RequestMessage msg = context.getRequestMessage();
+                    logger.debug("Close request {} for in thread {}", msg.getRequestId(), Thread.currentThread().getName());
+
+                    final Optional<UUID> sideEffect = msg.optionalArgs(Tokens.ARGS_SIDE_EFFECT);
+                    final TraversalSideEffects sideEffects = cache.getIfPresent(sideEffect.get());
+
+                    if (null == sideEffects) {
+                        final String msgDefault = String.format("Could not find side-effects for %s.", sideEffect.get());
+                        throw new OpProcessorException(msgDefault, ResponseMessage.build(message).code(ResponseStatusCode.SERVER_ERROR).statusMessage(msgDefault).create());
+                    }
+
+                    handleIterator(context, sideEffects.keys().iterator());
+                };
+
+                break;
+            case Tokens.OPS_CLOSE:
+                final Optional<String> sideEffectForClose = message.optionalArgs(Tokens.ARGS_SIDE_EFFECT);
+                if (!sideEffectForClose.isPresent()) {
+                    final String msg = String.format("A message with an [%s] op code requires a [%s] argument.", Tokens.OPS_CLOSE, Tokens.ARGS_SIDE_EFFECT);
+                    throw new OpProcessorException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+                }
+
+                op = context -> {
+                    final RequestMessage msg = context.getRequestMessage();
+                    logger.debug("Close request {} for in thread {}", msg.getRequestId(), Thread.currentThread().getName());
+
+                    final Optional<UUID> sideEffect = msg.optionalArgs(Tokens.ARGS_SIDE_EFFECT);
+                    cache.invalidate(sideEffect.get());
+                };
+
+                break;
             case Tokens.OPS_INVALID:
                 final String msgInvalid = String.format("Message could not be parsed.  Check the format of the request. [%s]", message);
                 throw new OpProcessorException(msgInvalid, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).statusMessage(msgInvalid).create());
@@ -109,6 +179,12 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
             throw new OpProcessorException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
         }
 
+        final Optional<Map<String, String>> aliases = validatedAliases(message);
+
+        return aliases.get();
+    }
+
+    private static Optional<Map<String, String>> validatedAliases(RequestMessage message) throws OpProcessorException {
         final Optional<Map<String, String>> aliases = message.optionalArgs(Tokens.ARGS_ALIASES);
         if (!aliases.isPresent()) {
             final String msg = String.format("A message with an [%s] op code requires a [%s] argument.", Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES);
@@ -119,8 +195,72 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
             final String msg = String.format("A message with an [%s] op code requires the [%s] argument to be a Map containing one alias assignment.", Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES);
             throw new OpProcessorException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
         }
+        return aliases;
+    }
 
-        return aliases.get();
+    private void gatherSideEffect(final Context context) throws OpProcessorException {
+        final RequestMessage msg = context.getRequestMessage();
+        logger.debug("Side-effect request {} for in thread {}", msg.getRequestId(), Thread.currentThread().getName());
+
+        // earlier validation in selection of this op method should free us to cast this without worry
+        final Optional<UUID> sideEffect = msg.optionalArgs(Tokens.ARGS_SIDE_EFFECT);
+        final Optional<String> sideEffectKey = msg.optionalArgs(Tokens.ARGS_SIDE_EFFECT_KEY);
+        final Map<String, String> aliases = (Map<String, String>) msg.optionalArgs(Tokens.ARGS_ALIASES).get();
+
+        final GraphManager graphManager = context.getGraphManager();
+        final String traversalSourceName = aliases.entrySet().iterator().next().getValue();
+        final TraversalSource g = graphManager.getTraversalSources().get(traversalSourceName);
+
+        final Timer.Context timerContext = traversalOpTimer.time();
+        try {
+            final ChannelHandlerContext ctx = context.getChannelHandlerContext();
+            final Graph graph = g.getGraph();
+            final boolean supportsTransactions = graph.features().graph().supportsTransactions();
+
+            context.getGremlinExecutor().getExecutorService().submit(() -> {
+                try {
+                    if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+
+                    try {
+                        final TraversalSideEffects sideEffects = cache.getIfPresent(sideEffect.get());
+
+                        if (null == sideEffects) {
+                            final String errorMessage = String.format("Could not find side-effects for %s.", sideEffect.get());
+                            logger.warn(errorMessage);
+                            ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(errorMessage).create());
+                            if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+                            return;
+                        }
+
+                        handleIterator(context, new SideEffectIterator(sideEffects.get(sideEffectKey.get()), sideEffectKey.get()));
+                    } catch (TimeoutException ex) {
+                        final String errorMessage = String.format("Response iteration exceeded the configured threshold for request [%s] - %s", msg.getRequestId(), ex.getMessage());
+                        logger.warn(errorMessage);
+                        ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(errorMessage).create());
+                        if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+                        return;
+                    } catch (Exception ex) {
+                        logger.warn(String.format("Exception processing a Traversal on iteration for request [%s].", msg.getRequestId()), ex);
+                        ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR).statusMessage(ex.getMessage()).create());
+                        if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+                        return;
+                    }
+
+                    if (graph.features().graph().supportsTransactions()) graph.tx().commit();
+                } catch (Exception ex) {
+                    logger.warn(String.format("Exception processing a Traversal on request [%s].", msg.getRequestId()), ex);
+                    ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR).statusMessage(ex.getMessage()).create());
+                    if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback();
+                } finally {
+                    timerContext.stop();
+                }
+            });
+
+        } catch (Exception ex) {
+            timerContext.stop();
+            throw new OpProcessorException("Could not iterate the Traversal instance",
+                    ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR).statusMessage(ex.getMessage()).create());
+        }
     }
 
     private void iterateBytecodeTraversal(final Context context) throws OpProcessorException {
@@ -164,6 +304,9 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
                         // compile the traversal - without it getEndStep() has nothing in it
                         traversal.applyStrategies();
                         handleIterator(context, new TraversalIterator(traversal));
+
+                        if (!traversal.getSideEffects().isEmpty())
+                            cache.put(msg.getRequestId(), traversal.getSideEffects());
                     } catch (TimeoutException ex) {
                         final String errorMessage = String.format("Response iteration exceeded the configured threshold for request [%s] - %s", msg.getRequestId(), ex.getMessage());
                         logger.warn(errorMessage);
@@ -195,21 +338,16 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
     }
 
     @Override
-    protected boolean isForceFlushed(final ChannelHandlerContext ctx, final RequestMessage msg, final Iterator itty) {
-        return itty instanceof TraversalIterator && ((TraversalIterator) itty).isNextBatchComingUp();
-    }
-
-    @Override
     protected Map<String, Object> generateMetaData(final ChannelHandlerContext ctx, final RequestMessage msg,
                                                    final ResponseStatusCode code, final Iterator itty) {
         Map<String,Object> metaData = Collections.emptyMap();
-        if (itty instanceof TraversalIterator) {
-            final TraversalIterator traversalIterator = (TraversalIterator) itty;
-            final String key = traversalIterator.getCurrentSideEffectKey();
+        if (itty instanceof SideEffectIterator) {
+            final SideEffectIterator traversalIterator = (SideEffectIterator) itty;
+            final String key = traversalIterator.getSideEffectKey();
             if (key != null) {
                 metaData = new HashMap<>();
                 metaData.put(Tokens.ARGS_SIDE_EFFECT, key);
-                metaData.put(Tokens.ARGS_AGGREGATE_TO, traversalIterator.getCurrentSideEffectAggregator());
+                metaData.put(Tokens.ARGS_AGGREGATE_TO, traversalIterator.getSideEffectAggregator());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/SideEffectIterator.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/SideEffectIterator.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/SideEffectIterator.java
new file mode 100644
index 0000000..3475f03
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/SideEffectIterator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.util;
+
+import org.apache.tinkerpop.gremlin.driver.Tokens;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public class SideEffectIterator implements Iterator<Object> {
+
+    private final Iterator<Object> sideEffectIterator;
+    private final String sideEffectKey;
+    private final String sideEffectAggregator;
+
+    public SideEffectIterator(final Object sideEffect, final String sideEffectKey) {
+        this.sideEffectKey = sideEffectKey;
+        sideEffectAggregator = getAggregatorType(sideEffect);
+        sideEffectIterator = sideEffect instanceof BulkSet ?
+                new BulkSetIterator((BulkSet) sideEffect) :
+                IteratorUtils.asIterator(sideEffect);
+    }
+
+    public String getSideEffectKey() {
+        return sideEffectKey;
+    }
+
+    public String getSideEffectAggregator() {
+        return sideEffectAggregator;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return sideEffectIterator.hasNext();
+    }
+
+    @Override
+    public Object next() {
+        return sideEffectIterator.next();
+    }
+
+    private String getAggregatorType(final Object o) {
+        if (o instanceof BulkSet)
+            return Tokens.VAL_AGGREGATE_TO_BULKSET;
+        else if (o instanceof Iterable || o instanceof Iterator)
+            return Tokens.VAL_AGGREGATE_TO_LIST;
+        else if (o instanceof Map)
+            return Tokens.VAL_AGGREGATE_TO_MAP;
+        else
+            return Tokens.VAL_AGGREGATE_TO_NONE;
+    }
+
+    static class BulkSetIterator implements Iterator {
+
+        private final Iterator<Map.Entry<Object,Long>> itty;
+
+        public BulkSetIterator(final BulkSet<Object> bulkSet) {
+            itty = bulkSet.asBulk().entrySet().iterator();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return itty.hasNext();
+        }
+
+        @Override
+        public Object next() {
+            final Map.Entry<Object, Long> entry = itty.next();
+            return new RemoteTraverser<>(entry.getKey(), entry.getValue());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/TraversalIterator.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/TraversalIterator.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/TraversalIterator.java
index 3027d8b..1b3f1bb 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/TraversalIterator.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/TraversalIterator.java
@@ -18,37 +18,22 @@
  */
 package org.apache.tinkerpop.gremlin.server.util;
 
-import org.apache.tinkerpop.gremlin.driver.Tokens;
 import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraverser;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.HaltedTraverserStrategy;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
-import java.util.Collections;
 import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
 
 /**
- * Provide a way to convert a {@link Traversal} and its related side-effects into a single {@code Iterator}.
- *
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
 public class TraversalIterator implements Iterator<Object> {
 
-    private final Traversal.Admin traversal;
-    private Iterator<Object> traversalIterator;
-    private Iterator<Object> sideEffectIterator = Collections.emptyIterator();
+    private final Iterator<Object> traversalIterator;
     private final HaltedTraverserStrategy haltedTraverserStrategy;
-    private Iterator<String> sideEffectKeys = Collections.emptyIterator();
-    private String currentSideEffectKey = null;
-    private String currentSideEffectAggregator = null;
-    private boolean latch = false;
 
     public TraversalIterator(final Traversal.Admin traversal) {
-        this.traversal = traversal;
         this.traversalIterator = traversal.getEndStep();
         this.haltedTraverserStrategy = traversal.getStrategies().getStrategy(HaltedTraverserStrategy.class).orElse(
                 Boolean.valueOf(System.getProperty("is.testing", "false")) ?
@@ -56,100 +41,14 @@ public class TraversalIterator implements Iterator<Object> {
                         HaltedTraverserStrategy.reference());
     }
 
-    public String getCurrentSideEffectKey() {
-        return this.currentSideEffectKey;
-    }
-
-    public String getCurrentSideEffectAggregator() {
-        return currentSideEffectAggregator;
-    }
-
-    /**
-     * Checks if the next "batch" of results are being returned. The "batch" refers to sets of results - in this case
-     * traversal results and individual sets of traversal side-effects.
-     */
-    public boolean isNextBatchComingUp() {
-        boolean nextBatch = false;
-        if (!latch) {
-            nextBatch = !traversalIterator.hasNext() && !sideEffectIterator.hasNext() && sideEffectKeys.hasNext();
-            if (nextBatch) latch = true;
-        }
-
-        return nextBatch;
-    }
-
     @Override
     public boolean hasNext() {
-        return this.traversalIterator.hasNext() || sideEffectIterator.hasNext() || sideEffectKeys.hasNext();
+        return this.traversalIterator.hasNext();
     }
 
     @Override
     public Object next() {
-        Object next = null;
-
-        // first iterate the traversal end step
-        if (traversalIterator.hasNext()) {
-            final Traverser.Admin t = this.haltedTraverserStrategy.halt((Traverser.Admin) traversalIterator.next());
-            next = new RemoteTraverser<>(t.get(), t.bulk());
-
-            // since there are no items left in the "result" then get the side-effect keys iterator
-            if (!traversalIterator.hasNext())
-                sideEffectKeys = traversal.getSideEffects().keys().iterator();
-        } else {
-            // if there are side-effects and iteration of the current side-effect is not in action then set it off
-            if (sideEffectKeys.hasNext() && !sideEffectIterator.hasNext()) {
-                currentSideEffectKey = sideEffectKeys.next();
-
-                // coerce everything to a state of bulking so that the client deals with a common incoming stream
-                // of BulkedResult
-                final Object currentSideEffect = traversal.getSideEffects().get(currentSideEffectKey);
-                currentSideEffectAggregator = getAggregatorType(currentSideEffect);
-                sideEffectIterator = currentSideEffect instanceof BulkSet ?
-                        new BulkResultIterator((BulkSet) currentSideEffect) :
-                        IteratorUtils.asIterator(currentSideEffect);
-            }
-
-            if (sideEffectIterator.hasNext())
-                next = sideEffectIterator.next();
-        }
-
-        if (null == next) throw new NoSuchElementException();
-
-        // force a reset of the latch as the iterator has moved forward and checking for the next batch can be
-        // re-enabled
-        latch = false;
-
-        return next;
-    }
-
-    private String getAggregatorType(final Object o) {
-        if (o instanceof BulkSet)
-            return Tokens.VAL_AGGREGATE_TO_BULKSET;
-        else if (o instanceof Iterable || o instanceof Iterator)
-            return Tokens.VAL_AGGREGATE_TO_LIST;
-        else if (o instanceof Map)
-            return Tokens.VAL_AGGREGATE_TO_MAP;
-        else
-            return Tokens.VAL_AGGREGATE_TO_NONE;
-    }
-
-    static class BulkResultIterator implements Iterator {
-
-        private final Iterator<Map.Entry<Object,Long>> itty;
-
-        public BulkResultIterator(final BulkSet<Object> bulkSet) {
-            itty = bulkSet.asBulk().entrySet().iterator();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return itty.hasNext();
-        }
-
-        @Override
-        public Object next() {
-            final Map.Entry<Object, Long> entry = itty.next();
-            return new RemoteTraverser<>(entry.getKey(), entry.getValue());
-        }
+        final Traverser.Admin t = this.haltedTraverserStrategy.halt((Traverser.Admin) traversalIterator.next());
+        return new RemoteTraverser<>(t.get(), t.bulk());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/util/TraversalIteratorTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/util/TraversalIteratorTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/util/TraversalIteratorTest.java
deleted file mode 100644
index 699bf10..0000000
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/util/TraversalIteratorTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.server.util;
-
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;
-import org.junit.Test;
-
-import static junit.framework.TestCase.assertEquals;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertNull;
-
-/**
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-public class TraversalIteratorTest {
-    @Test
-    public void shouldIterateWithNoSideEffects() {
-        final Graph graph = TinkerFactory.createModern();
-        final GraphTraversalSource g = graph.traversal();
-        final Traversal t = g.V();
-        final TraversalIterator itty = new TraversalIterator(t.asAdmin());
-
-        int counter = 0;
-        while(itty.hasNext()) {
-            counter++;
-
-            // there are no side effects so there should never be a side-effect key
-            assertNull(itty.getCurrentSideEffectKey());
-            assertThat(itty.isNextBatchComingUp(), is(false));
-
-            itty.next();
-        }
-
-        assertEquals(6, counter);
-    }
-
-    @Test
-    public void shouldIterateWithOneSideEffect() {
-        final Graph graph = TinkerFactory.createModern();
-        final GraphTraversalSource g = graph.traversal();
-        final Traversal t = g.V().aggregate("a");
-        final TraversalIterator itty = new TraversalIterator(t.asAdmin());
-
-        int counter = 0;
-        while(itty.hasNext()) {
-            counter++;
-
-            itty.next();
-
-            if (counter == 6)
-                assertThat(itty.isNextBatchComingUp(), is(true));
-            else
-                assertThat(itty.isNextBatchComingUp(), is(false));
-
-            // first 6 should be "result" and the second 6 should be "a"
-            if (counter <= 6)
-                assertNull(itty.getCurrentSideEffectKey());
-            else
-                assertEquals("a", itty.getCurrentSideEffectKey());
-        }
-
-        assertEquals(12, counter);
-    }
-
-    @Test
-    public void shouldIterateWithMultipleSideEffects() {
-        final Graph graph = TinkerFactory.createModern();
-        final GraphTraversalSource g = graph.traversal();
-        final Traversal t = g.V().aggregate("a").outE("knows").aggregate("b").outV();
-        final TraversalIterator itty = new TraversalIterator(t.asAdmin());
-
-        int counter = 0;
-        while(itty.hasNext()) {
-            counter++;
-
-            // just be sure multiple calls to hasNext() don't foul things up
-            itty.hasNext();
-            itty.hasNext();
-            itty.hasNext();
-
-            itty.next();
-
-            // just be sure multiple calls to hasNext() don't foul things up
-            itty.hasNext();
-            itty.hasNext();
-            itty.hasNext();
-
-            // batches occur at switch between result and "a" side effect then between side-effect "a" and "b"
-            if (counter == 2 || counter == 8)
-                assertThat(itty.isNextBatchComingUp(), is(true));
-            else
-                assertThat(itty.isNextBatchComingUp(), is(false));
-
-            // first 2 should be "result" and the second 6 should be "a" and the third 2 should be "b" - i think it
-            // is safe to assume deterministic order here on the side-effects
-            if (counter <= 2)
-                assertNull(itty.getCurrentSideEffectKey());
-            else if (counter <= 8)
-                assertEquals("a", itty.getCurrentSideEffectKey());
-            else
-                assertEquals("b", itty.getCurrentSideEffectKey());
-        }
-
-        assertEquals(10, counter);
-    }
-}


[2/2] tinkerpop git commit: Major refactoring of TraversalOpProcessor work

Posted by sp...@apache.org.
Major refactoring of TraversalOpProcessor work

Altered the RemoteConnection API (and related interfaces). Used a cache for sideeffects and protocol features that enable retrieval of them on demand.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/a6ab71be
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/a6ab71be
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/a6ab71be

Branch: refs/heads/TINKERPOP-1278
Commit: a6ab71be80f09c2400ec3bf4c6ec080638450c62
Parents: 13f2a14
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri Aug 5 16:02:15 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri Aug 5 16:02:15 2016 -0400

----------------------------------------------------------------------
 .../process/remote/RemoteConnection.java        |   9 +-
 .../gremlin/process/remote/RemoteGraph.java     |   4 +
 .../gremlin/process/remote/RemoteResponse.java  |  54 -------
 .../traversal/AbstractRemoteTraversal.java      | 117 ++++++++++++++
 .../AbstractRemoteTraversalSideEffects.java     | 113 +++++++++++++
 .../remote/traversal/RemoteTraversal.java       |  29 ++++
 .../traversal/RemoteTraversalSideEffects.java   |  27 ++++
 .../remote/traversal/step/map/RemoteStep.java   |  15 +-
 .../apache/tinkerpop/gremlin/driver/Client.java |   9 ++
 .../tinkerpop/gremlin/driver/Connection.java    |   2 +-
 .../tinkerpop/gremlin/driver/Handler.java       |  12 +-
 .../tinkerpop/gremlin/driver/ResultQueue.java   |  73 +++------
 .../tinkerpop/gremlin/driver/ResultSet.java     |  18 +--
 .../apache/tinkerpop/gremlin/driver/Tokens.java |  10 +-
 .../gremlin/driver/message/RequestMessage.java  |   8 +
 .../driver/remote/DriverRemoteConnection.java   |  10 +-
 .../driver/remote/DriverRemoteResponse.java     | 107 -------------
 .../driver/remote/DriverRemoteTraversal.java    | 114 +++++++++++++
 .../DriverRemoteTraversalSideEffects.java       |  82 ++++++++++
 .../remote/DriverTraversalSideEffects.java      | 150 -----------------
 .../gremlin/driver/ResultQueueTest.java         | 146 ++++++-----------
 .../tinkerpop/gremlin/driver/ResultSetTest.java |  29 +---
 gremlin-server/pom.xml                          |   5 +
 .../op/traversal/TraversalOpProcessor.java      | 160 +++++++++++++++++--
 .../gremlin/server/util/SideEffectIterator.java |  94 +++++++++++
 .../gremlin/server/util/TraversalIterator.java  | 109 +------------
 .../server/util/TraversalIteratorTest.java      | 126 ---------------
 27 files changed, 867 insertions(+), 765 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
index 870bed4..f2cc399 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.process.remote;
 
+import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
@@ -28,7 +29,7 @@ import java.util.Iterator;
  * A simple abstraction of a "connection" to a "server" that is capable of processing a {@link Traversal} and
  * returning results. Results refer to both the {@link Iterator} of results from the submitted {@link Traversal}
  * as well as the side-effects produced by that {@link Traversal}. Those results together are wrapped in a
- * {@link RemoteResponse}.
+ * {@link Traversal}.
  *
  * @author Stephen Mallette (http://stephen.genoprime.com)
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -42,9 +43,9 @@ public interface RemoteConnection extends AutoCloseable {
     public <E> Iterator<Traverser.Admin<E>> submit(final Traversal<?, E> traversal) throws RemoteConnectionException;
 
     /**
-     * Submits {@link Traversal} {@link Bytecode} to a server and returns a {@link RemoteResponse}.
-     * The {@link RemoteResponse} is an abstraction over two types of results that can be returned as part of the
+     * Submits {@link Traversal} {@link Bytecode} to a server and returns a {@link Traversal}.
+     * The {@link Traversal} is an abstraction over two types of results that can be returned as part of the
      * response from the server: the results of the {@link Traversal} itself and the the side-effects that it produced.
      */
-    public <E> RemoteResponse<E> submit(final Bytecode bytecode) throws RemoteConnectionException;
+    public <E> RemoteTraversal<?,E> submit(final Bytecode bytecode) throws RemoteConnectionException;
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteGraph.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteGraph.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteGraph.java
index c5c1cac..9852ed0 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteGraph.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteGraph.java
@@ -46,6 +46,10 @@ import java.util.Iterator;
 @Graph.OptIn(Graph.OptIn.SUITE_PROCESS_STANDARD)
 @Graph.OptIn(Graph.OptIn.SUITE_PROCESS_COMPUTER)
 @Graph.OptOut(
+        test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupCountTest",
+        method = "g_V_hasXnoX_groupCountXaX_capXaX",
+        reason = "This test asserts an empty side-effect which reflects as a null rather than an \"empty\" and thus doens't assert")
+@Graph.OptOut(
         test = "org.apache.tinkerpop.gremlin.process.traversal.step.branch.BranchTest",
         method = "g_V_branchXlabelX_optionXperson__ageX_optionXsoftware__langX_optionXsoftware__nameX",
         reason = "Issues with Longs")

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteResponse.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteResponse.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteResponse.java
deleted file mode 100644
index ee7a5fd..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteResponse.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.process.remote;
-
-import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraverser;
-import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalSideEffects;
-
-import java.util.Iterator;
-
-/**
- * The {@code RemoteResponse} is returned from {@link RemoteConnection#submit(Bytecode)} and provides implementers a
- * way to represent how they will return the results of a submitted {@link Traversal} and its side-effects. The
- * {@code RemoteResponse} is used internally by traversals spawned from a {@link RemoteGraph} to put remote results
- * into the streams of those traversals and to replace client-side {@link TraversalSideEffects} in those traversals.
- *
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-public interface RemoteResponse<E> {
-
-    /**
-     * Gets the list of results from a {@link Traversal} executed remotely. Implementers may push their results into
-     * a {@link RemoteTraverser} instance to feed that {@code Iterator} or create their own implementation of it if
-     * there is some advantage to doing so.
-     */
-    public Iterator<Traverser.Admin<E>> getTraversers();
-
-    /**
-     * Gets the side-effects (if any) from the remotely executed {@link Traversal}. Simple implementations could
-     * likely use {@link DefaultTraversalSideEffects}, but more advanced implementations might look to lazily load
-     * side-effects or otherwise implement some form of blocking to ensure that all side-effects are present from the
-     * remote location.
-     */
-    public TraversalSideEffects getSideEffects();
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/AbstractRemoteTraversal.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/AbstractRemoteTraversal.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/AbstractRemoteTraversal.java
new file mode 100644
index 0000000..28eeae8
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/AbstractRemoteTraversal.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.remote.traversal;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+import org.apache.tinkerpop.gremlin.process.traversal.TraverserGenerator;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public abstract class AbstractRemoteTraversal<S,E> implements RemoteTraversal<S,E> {
+    @Override
+    public Bytecode getBytecode() {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public List<Step> getSteps() {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public <S2, E2> Admin<S2, E2> addStep(final int index, final Step<?, ?> step) throws IllegalStateException {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public <S2, E2> Admin<S2, E2> removeStep(final int index) throws IllegalStateException {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public void applyStrategies() throws IllegalStateException {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public TraverserGenerator getTraverserGenerator() {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public Set<TraverserRequirement> getTraverserRequirements() {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public void setSideEffects(final TraversalSideEffects sideEffects) {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public void setStrategies(final TraversalStrategies strategies) {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public TraversalStrategies getStrategies() {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public void setParent(final TraversalParent step) {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public TraversalParent getParent() {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public Admin<S, E> clone() {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public boolean isLocked() {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public Optional<Graph> getGraph() {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public void setGraph(final Graph graph) {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/AbstractRemoteTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/AbstractRemoteTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/AbstractRemoteTraversalSideEffects.java
new file mode 100644
index 0000000..9da0754
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/AbstractRemoteTraversalSideEffects.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.remote.traversal;
+
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+
+import java.util.Optional;
+import java.util.function.BinaryOperator;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+/**
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public abstract class AbstractRemoteTraversalSideEffects implements RemoteTraversalSideEffects {
+
+    @Override
+    public void set(final String key, final Object value) throws IllegalArgumentException {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public void remove(final String key) {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public <V> void register(final String key, final Supplier<V> initialValue, BinaryOperator<V> reducer) {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public <V> void registerIfAbsent(final String key, final Supplier<V> initialValue, BinaryOperator<V> reducer) {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public <V> BinaryOperator<V> getReducer(final String key) throws IllegalArgumentException {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public <V> Supplier<V> getSupplier(final String key) throws IllegalArgumentException {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public void add(final String key, final Object value) throws IllegalArgumentException {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public <S> void setSack(final Supplier<S> initialValue, UnaryOperator<S> splitOperator, final BinaryOperator<S> mergeOperator) {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public <S> Supplier<S> getSackInitialValue() {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public <S> UnaryOperator<S> getSackSplitter() {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public <S> BinaryOperator<S> getSackMerger() {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public TraversalSideEffects clone() {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public void mergeInto(final TraversalSideEffects sideEffects) {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public void registerSupplier(final String key, final Supplier supplier) {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key) {
+        throw new UnsupportedOperationException("Remote traversals do not support this method");
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.traversalSideEffectsString(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java
new file mode 100644
index 0000000..1f2ac74
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.remote.traversal;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+
+/**
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public interface RemoteTraversal<S,E> extends Traversal.Admin<S,E> {
+    @Override
+    public RemoteTraversalSideEffects getSideEffects();
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversalSideEffects.java
new file mode 100644
index 0000000..e41a42d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversalSideEffects.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.remote.traversal;
+
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+
+/**
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public interface RemoteTraversalSideEffects extends TraversalSideEffects {
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java
index 53b1378..387ef96 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java
@@ -20,14 +20,14 @@ package org.apache.tinkerpop.gremlin.process.remote.traversal.step.map;
 
 import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection;
 import org.apache.tinkerpop.gremlin.process.remote.RemoteConnectionException;
-import org.apache.tinkerpop.gremlin.process.remote.RemoteResponse;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraversal;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraverser;
 import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
-import java.util.Iterator;
 import java.util.NoSuchElementException;
 
 /**
@@ -39,7 +39,7 @@ import java.util.NoSuchElementException;
 public final class RemoteStep<S, E> extends AbstractStep<S, E> {
 
     private transient RemoteConnection remoteConnection;
-    private Iterator<Traverser.Admin<E>> remoteIterator;
+    private RemoteTraversal<?,E> remoteTraversal;
     private Bytecode bytecode;
 
     @SuppressWarnings("unchecked")
@@ -57,16 +57,15 @@ public final class RemoteStep<S, E> extends AbstractStep<S, E> {
     @SuppressWarnings("unchecked")
     @Override
     protected Traverser.Admin<E> processNextStart() throws NoSuchElementException {
-        if (null == this.remoteIterator) {
+        if (null == this.remoteTraversal) {
             try {
-                final RemoteResponse remoteResponse = this.remoteConnection.submit(this.bytecode);
-                this.remoteIterator = remoteResponse.getTraversers();
-                this.traversal.setSideEffects(remoteResponse.getSideEffects());
+                remoteTraversal = this.remoteConnection.submit(this.bytecode);
+                this.traversal.setSideEffects(remoteTraversal.getSideEffects());
             } catch (final RemoteConnectionException sce) {
                 throw new IllegalStateException(sce);
             }
         }
 
-        return this.remoteIterator.next();
+        return (Traverser.Admin<E>) this.remoteTraversal.next();
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 3716411..5183493 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -556,6 +556,15 @@ public abstract class Client {
         }
 
         @Override
+        public CompletableFuture<ResultSet> submitAsync(final RequestMessage msg) {
+            final RequestMessage.Builder builder = RequestMessage.from(msg);
+            if (!aliases.isEmpty())
+                builder.addArg(Tokens.ARGS_ALIASES, aliases);
+
+            return super.submitAsync(builder.create());
+        }
+
+        @Override
         public CompletableFuture<ResultSet> submitAsync(final Traversal traversal) {
             return submitAsync(traversal.asAdmin().getBytecode());
         }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 22e48fe..1fb77f1 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -230,7 +230,7 @@ final class Connection {
 
                         final ResultQueue handler = new ResultQueue(resultLinkedBlockingQueue, readCompleted);
                         pending.put(requestMessage.getRequestId(), handler);
-                        future.complete(new ResultSet(handler, cluster.executor(), readCompleted));
+                        future.complete(new ResultSet(handler, cluster.executor(), readCompleted, requestMessage));
                     }
                 });
         channel.writeAndFlush(requestMessage, promise);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
index c9f838a..793f6a6 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
@@ -193,23 +193,22 @@ final class Handler {
                         if (data instanceof List) {
                             // unrolls the collection into individual results to be handled by the queue.
                             final List<Object> listToUnroll = (List<Object>) data;
-                            listToUnroll.forEach(item -> tryUnrollBulkedResult(queue, item));
+                            listToUnroll.forEach(item -> tryUnrollTraverser(queue, item));
                         } else {
                             // since this is not a list it can just be added to the queue
-                            tryUnrollBulkedResult(queue, response.getResult().getData());
+                            tryUnrollTraverser(queue, response.getResult().getData());
                         }
                     } else {
                         // this is the side-effect from the server which is generated from a serialized traversal
-                        final String sideEffectKey = meta.get(Tokens.ARGS_SIDE_EFFECT).toString();
                         final String aggregateTo = meta.getOrDefault(Tokens.ARGS_AGGREGATE_TO, Tokens.VAL_AGGREGATE_TO_NONE).toString();
                         if (data instanceof List) {
                             // unrolls the collection into individual results to be handled by the queue.
                             final List<Object> listOfSideEffects = (List<Object>) data;
-                            listOfSideEffects.forEach(sideEffect -> queue.addSideEffect(sideEffectKey, aggregateTo, sideEffect));
+                            listOfSideEffects.forEach(sideEffect -> queue.addSideEffect(aggregateTo, sideEffect));
                         } else {
                             // since this is not a list it can just be added to the queue. this likely shouldn't occur
                             // however as the protocol will typically push everything to list first.
-                            queue.addSideEffect(sideEffectKey, aggregateTo, data);
+                            queue.addSideEffect(aggregateTo, data);
                         }
                     }
                 } else {
@@ -228,10 +227,9 @@ final class Handler {
             }
         }
 
-        private void tryUnrollBulkedResult(final ResultQueue queue, final Object item) {
+        private void tryUnrollTraverser(final ResultQueue queue, final Object item) {
             if (unrollTraversers) {
                 if (item instanceof Traverser.Admin) {
-                    // TODO: i think this is just temporary code - needed for backward compatibility to the old way of serializing Traversal with java serialization
                     final Traverser.Admin t = (Traverser.Admin) item;
                     final Object result = t.get();
                     for (long ix = 0; ix < t.bulk(); ix++) {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
index 490b3f7..c499a20 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
@@ -27,11 +27,9 @@ import org.javatuples.Pair;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -48,7 +46,7 @@ final class ResultQueue {
 
     private final LinkedBlockingQueue<Result> resultLinkedBlockingQueue;
 
-    private final Map<String, Object> sideEffectResult = new LinkedHashMap<>();
+    private Object aggregatedResult = null;
 
     private final AtomicReference<Throwable> error = new AtomicReference<>();
 
@@ -76,79 +74,52 @@ final class ResultQueue {
      * is only returned when a {@link Traversal} is submitted and refers to the side-effects defined in that traversal.
      * A "script" will not return side-effects.
      *
-     * @param k the key of the side-effect
      * @param aggregateTo the value of the {@link ResponseMessage} metadata for {@link Tokens#ARGS_AGGREGATE_TO}.
      * @param sideEffectValue the value of the side-effect itself
      */
-    public void addSideEffect(final String k, final String aggregateTo, final Object sideEffectValue) {
+    public void addSideEffect(final String aggregateTo, final Object sideEffectValue) {
         switch (aggregateTo) {
             case Tokens.VAL_AGGREGATE_TO_BULKSET:
                 if (!(sideEffectValue instanceof Traverser.Admin))
-                    throw new IllegalStateException(String.format("Side-effect \"%s\" value %s is a %s which does not aggregate to %s",
-                            k, sideEffectValue, sideEffectValue.getClass().getSimpleName(), aggregateTo));
+                    throw new IllegalStateException(String.format("Side-effect value %s is a %s which does not aggregate to %s",
+                            sideEffectValue, sideEffectValue.getClass().getSimpleName(), aggregateTo));
 
-                if (!sideEffectResult.containsKey(k))
-                    putIfAbsent(k, new BulkSet());
+                if (null == aggregatedResult) aggregatedResult = new BulkSet();
 
-                final BulkSet<Object> bs = validateAndGet(k, aggregateTo, BulkSet.class);
+                final BulkSet<Object> bs = validate(aggregateTo, BulkSet.class);
                 final Traverser.Admin traverser = (Traverser.Admin) sideEffectValue;
                 bs.add(traverser.get(), traverser.bulk());
                 break;
             case Tokens.VAL_AGGREGATE_TO_LIST:
-                if (!sideEffectResult.containsKey(k))
-                    putIfAbsent(k, new ArrayList());
-
-                final List<Object> list = validateAndGet(k, aggregateTo, List.class);
+                if (null == aggregatedResult) aggregatedResult = new ArrayList();
+                final List<Object> list = validate(aggregateTo, List.class);
                 list.add(sideEffectValue);
                 break;
             case Tokens.VAL_AGGREGATE_TO_MAP:
                 if (!(sideEffectValue instanceof Map.Entry))
-                    throw new IllegalStateException(String.format("Side-effect \"%s\" value %s is a %s which does not aggregate to %s",
-                            k, sideEffectValue, sideEffectValue.getClass().getSimpleName(), aggregateTo));
+                    throw new IllegalStateException(String.format("Side-effect value %s is a %s which does not aggregate to %s",
+                            sideEffectValue, sideEffectValue.getClass().getSimpleName(), aggregateTo));
 
-                if (!sideEffectResult.containsKey(k))
-                    putIfAbsent(k, new HashMap());
+                if (null == aggregatedResult) aggregatedResult =  new HashMap();
 
-                final Map<Object,Object > m = validateAndGet(k, aggregateTo, Map.class);
+                final Map<Object,Object > m = validate(aggregateTo, Map.class);
                 final Map.Entry entry = (Map.Entry) sideEffectValue;
                 m.put(entry.getKey(), entry.getValue());
                 break;
             case Tokens.VAL_AGGREGATE_TO_NONE:
-                if (!sideEffectResult.containsKey(k))
-                    putIfAbsent(k, sideEffectValue);
+                if (null == aggregatedResult) aggregatedResult = sideEffectValue;
                 break;
             default:
                 throw new IllegalStateException(String.format("%s is an invalid value for %s", aggregateTo, Tokens.ARGS_AGGREGATE_TO));
         }
     }
 
-    private <V> V validateAndGet(final String k, final String aggregateTo, final Class<?> expected) {
-        final Object shouldBe = sideEffectResult.get(k);
-        if (!(expected.isAssignableFrom(shouldBe.getClass())))
+    private <V> V validate(final String aggregateTo, final Class<?> expected) {
+        if (!(expected.isAssignableFrom(aggregatedResult.getClass())))
             throw new IllegalStateException(String.format("Side-effect \"%s\" contains the type %s that is not acceptable for %s",
-                    k, shouldBe.getClass().getSimpleName(), aggregateTo));
-
-        return (V) shouldBe;
-    }
-
-    /**
-     * Gets the keys gather for the side-effect. If the queue is still filling (i.e. the read is not complete) then
-     * there could be inconsistent results depending on when this method is called. It would be best to wait to call
-     * this method on {@link #readComplete}.
-     */
-    public Set<String> getSideEffectKeys() {
-        return sideEffectResult.keySet();
-    }
+                    aggregatedResult.getClass().getSimpleName(), aggregateTo));
 
-    /**
-     * Gets the current values for the side-effect. If the queue is still filling (i.e. the read is not complete) then
-     * there could be inconsistent results depending on when this method is called. It would be best to wait to call
-     * this method on {@link #readComplete}.
-     *
-     * @param k the key of the side-effect
-     */
-    public <V> V getSideEffect(final String k) {
-        return (V) sideEffectResult.get(k);
+        return (V) aggregatedResult;
     }
 
     public CompletableFuture<List<Result>> await(final int items) {
@@ -177,6 +148,12 @@ final class ResultQueue {
 
     void markComplete() {
         this.readComplete.complete(null);
+
+        // if there was some aggregation performed in the queue then the full object is hanging out waiting to be
+        // added to the ResultSet
+        if (aggregatedResult != null)
+            add(new Result(aggregatedResult));
+
         this.drainAllWaiting();
     }
 
@@ -186,10 +163,6 @@ final class ResultQueue {
         this.drainAllWaiting();
     }
 
-    private synchronized void putIfAbsent(final String key, final Object o) {
-        sideEffectResult.putIfAbsent(key, o);
-    }
-
     /**
      * Completes the next waiting future if there is one.
      */

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
index 81fa49f..ed93fa3 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
@@ -18,11 +18,11 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Spliterator;
 import java.util.Spliterators;
@@ -50,24 +50,20 @@ import java.util.stream.StreamSupport;
 public final class ResultSet implements Iterable<Result> {
     private final ResultQueue resultQueue;
     private final ExecutorService executor;
+    private final RequestMessage originalRequestMessage;
 
     private final CompletableFuture<Void> readCompleted;
 
     public ResultSet(final ResultQueue resultQueue, final ExecutorService executor,
-                     final CompletableFuture<Void> readCompleted) {
+                     final CompletableFuture<Void> readCompleted, final RequestMessage originalRequestMessage) {
         this.executor = executor;
         this.resultQueue = resultQueue;
         this.readCompleted = readCompleted;
+        this.originalRequestMessage = originalRequestMessage;
     }
 
-    public CompletableFuture<Map<String,Object>> getSideEffectResults() {
-        final CompletableFuture<Map<String,Object>> future = new CompletableFuture<>();
-        readCompleted.thenRunAsync(() -> {
-            final Map<String,Object> se = new HashMap<>();
-            resultQueue.getSideEffectKeys().forEach(k -> se.put(k, resultQueue.getSideEffect(k)));
-            future.complete(se);
-        }, executor);
-        return future;
+    public RequestMessage getOriginalRequestMessage() {
+        return originalRequestMessage;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
index 00858ff..5542f60 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
@@ -33,6 +33,9 @@ public final class Tokens {
     public static final String OPS_BYTECODE = "bytecode";
     public static final String OPS_EVAL = "eval";
     public static final String OPS_INVALID = "invalid";
+    public static final String OPS_GATHER = "gather";
+    public static final String OPS_KEYS = "keys";
+    public static final String OPS_CLOSE = "close";
 
     /**
      * @deprecated As for release 3.2.2, not replaced as this feature was never really published as official.
@@ -44,12 +47,6 @@ public final class Tokens {
      * @deprecated As for release 3.2.2, not replaced as this feature was never really published as official.
      */
     @Deprecated
-    public static final String OPS_CLOSE = "close";
-
-    /**
-     * @deprecated As for release 3.2.2, not replaced as this feature was never really published as official.
-     */
-    @Deprecated
     public static final String OPS_IMPORT = "import";
 
     /**
@@ -82,6 +79,7 @@ public final class Tokens {
     public static final String ARGS_SASL_MECHANISM = "saslMechanism";
     public static final String ARGS_SIDE_EFFECT = "sideEffect";
     public static final String ARGS_AGGREGATE_TO = "aggregateTo";
+    public static final String ARGS_SIDE_EFFECT_KEY = "sideEffectKey";
 
     /**
      * @deprecated As of release 3.1.0-incubating, replaced by {@link #ARGS_ALIASES}.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/RequestMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/RequestMessage.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/RequestMessage.java
index 3b9a0a5..7a2ad3d 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/RequestMessage.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/RequestMessage.java
@@ -95,6 +95,14 @@ public final class RequestMessage {
         return o == null ? Optional.empty() : Optional.of((T) o);
     }
 
+    public static Builder from(final RequestMessage msg) {
+        final Builder builder = build(msg.op)
+                .overrideRequestId(msg.requestId)
+                .processor(msg.processor);
+        msg.args.forEach(builder::addArg);
+        return builder;
+    }
+
     public static Builder build(final String op) {
         return new Builder(op);
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
index 1cff037..d65354c 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
@@ -26,7 +26,7 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decorati
 import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection;
 import org.apache.tinkerpop.gremlin.process.remote.RemoteConnectionException;
 import org.apache.tinkerpop.gremlin.process.remote.RemoteGraph;
-import org.apache.tinkerpop.gremlin.process.remote.RemoteResponse;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
@@ -167,9 +167,9 @@ public class DriverRemoteConnection implements RemoteConnection {
             if (attachElements && !t.asAdmin().getStrategies().getStrategy(VertexProgramStrategy.class).isPresent()) {
                 if (!conf.isPresent()) throw new IllegalStateException("Traverser can't be reattached for testing");
                 final Graph graph = ((Supplier<Graph>) conf.get().getProperty("hidden.for.testing.only")).get();
-                return new DriverRemoteResponse.AttachingTraverserIterator<>(client.submit(t.asAdmin().getBytecode()).iterator(), graph);
+                return new DriverRemoteTraversal.AttachingTraverserIterator<>(client.submit(t.asAdmin().getBytecode()).iterator(), graph);
             } else {
-                return new DriverRemoteResponse.TraverserIterator<>(client.submit(t.asAdmin().getBytecode()).iterator());
+                return new DriverRemoteTraversal.TraverserIterator<>(client.submit(t.asAdmin().getBytecode()).iterator());
             }
         } catch (Exception ex) {
             throw new RemoteConnectionException(ex);
@@ -177,10 +177,10 @@ public class DriverRemoteConnection implements RemoteConnection {
     }
 
     @Override
-    public <E> RemoteResponse<E> submit(final Bytecode bytecode) throws RemoteConnectionException {
+    public <E> RemoteTraversal<?,E> submit(final Bytecode bytecode) throws RemoteConnectionException {
         try {
             final ResultSet rs = client.submit(bytecode);
-            return new DriverRemoteResponse<>(rs, attachElements, conf);
+            return new DriverRemoteTraversal<>(rs, client, attachElements, conf);
         } catch (Exception ex) {
             throw new RemoteConnectionException(ex);
         }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteResponse.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteResponse.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteResponse.java
deleted file mode 100644
index 75a4e26..0000000
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteResponse.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.driver.remote;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.driver.Result;
-import org.apache.tinkerpop.gremlin.driver.ResultSet;
-import org.apache.tinkerpop.gremlin.process.remote.RemoteResponse;
-import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraverser;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.structure.Element;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Property;
-import org.apache.tinkerpop.gremlin.structure.util.Attachable;
-
-import java.util.Iterator;
-import java.util.Optional;
-import java.util.function.Supplier;
-
-/**
- * A {@link RemoteResponse} implementation for the Gremlin Driver.
- *
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-public class DriverRemoteResponse<E> implements RemoteResponse<E> {
-
-    private final Iterator<Traverser.Admin<E>> resultIterator;
-    private final TraversalSideEffects sideEffects;
-
-    public DriverRemoteResponse(final ResultSet rs, final boolean attach, Optional<Configuration> conf) {
-        // attaching is really just for testing purposes. it doesn't make sense in any real-world scenario as it would
-        // require that the client have access to the Graph instance that produced the result. tests need that
-        // attachment process to properly execute in full hence this little hack.
-        if (attach) {
-            if (!conf.isPresent()) throw new IllegalStateException("Traverser can't be reattached for testing");
-            final Graph graph = ((Supplier<Graph>) conf.get().getProperty("hidden.for.testing.only")).get();
-            resultIterator = new AttachingTraverserIterator<>(rs.iterator(), graph);
-        } else {
-            resultIterator = new TraverserIterator<>(rs.iterator());
-        }
-
-        sideEffects = new DriverTraversalSideEffects(rs);
-    }
-
-    @Override
-    public Iterator<Traverser.Admin<E>> getTraversers() {
-        return resultIterator;
-    }
-
-    @Override
-    public TraversalSideEffects getSideEffects() {
-        return sideEffects;
-    }
-
-    static class TraverserIterator<E> implements Iterator<Traverser.Admin<E>> {
-
-        private final Iterator<Result> inner;
-
-        public TraverserIterator(final Iterator<Result> resultIterator) {
-            inner = resultIterator;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return inner.hasNext();
-        }
-
-        @Override
-        public Traverser.Admin<E> next() {
-            return (RemoteTraverser<E>) inner.next().getObject();
-        }
-    }
-
-    static class AttachingTraverserIterator<E> extends TraverserIterator<E> {
-        private final Graph graph;
-
-        public AttachingTraverserIterator(final Iterator<Result> resultIterator, final Graph graph) {
-            super(resultIterator);
-            this.graph = graph;
-        }
-
-        @Override
-        public Traverser.Admin<E> next() {
-            final Traverser.Admin<E> traverser = super.next();
-            if (traverser.get() instanceof Attachable && !(traverser.get() instanceof Property))
-                traverser.set((E) ((Attachable<Element>) traverser.get()).attach(Attachable.Method.get(graph)));
-            return traverser;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java
new file mode 100644
index 0000000..e3a8612
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.remote;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.driver.Client;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.AbstractRemoteTraversal;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraverser;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Property;
+import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * A {@link AbstractRemoteTraversal} implementation for the Gremlin Driver.
+ *
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public class DriverRemoteTraversal<S,E> extends AbstractRemoteTraversal<S,E> {
+
+    private final Iterator<Traverser.Admin<E>> resultIterator;
+    private final RemoteTraversalSideEffects sideEffects;
+
+    public DriverRemoteTraversal(final ResultSet rs, final Client client, final boolean attach, final Optional<Configuration> conf) {
+        // attaching is really just for testing purposes. it doesn't make sense in any real-world scenario as it would
+        // require that the client have access to the Graph instance that produced the result. tests need that
+        // attachment process to properly execute in full hence this little hack.
+        if (attach) {
+            if (!conf.isPresent()) throw new IllegalStateException("Traverser can't be reattached for testing");
+            final Graph graph = ((Supplier<Graph>) conf.get().getProperty("hidden.for.testing.only")).get();
+            resultIterator = new AttachingTraverserIterator<>(rs.iterator(), graph);
+        } else {
+            resultIterator = new TraverserIterator<>(rs.iterator());
+        }
+
+        sideEffects = new DriverRemoteTraversalSideEffects(client, rs.getOriginalRequestMessage().getRequestId());
+    }
+
+    @Override
+    public RemoteTraversalSideEffects getSideEffects() {
+        return sideEffects;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return resultIterator.hasNext();
+    }
+
+    @Override
+    public E next() {
+        return (E) resultIterator.next();
+    }
+
+    static class TraverserIterator<E> implements Iterator<Traverser.Admin<E>> {
+
+        private final Iterator<Result> inner;
+
+        public TraverserIterator(final Iterator<Result> resultIterator) {
+            inner = resultIterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return inner.hasNext();
+        }
+
+        @Override
+        public Traverser.Admin<E> next() {
+            return (RemoteTraverser<E>) inner.next().getObject();
+        }
+    }
+
+    static class AttachingTraverserIterator<E> extends TraverserIterator<E> {
+        private final Graph graph;
+
+        public AttachingTraverserIterator(final Iterator<Result> resultIterator, final Graph graph) {
+            super(resultIterator);
+            this.graph = graph;
+        }
+
+        @Override
+        public Traverser.Admin<E> next() {
+            final Traverser.Admin<E> traverser = super.next();
+            if (traverser.get() instanceof Attachable && !(traverser.get() instanceof Property))
+                traverser.set((E) ((Attachable<Element>) traverser.get()).attach(Attachable.Method.get(graph)));
+            return traverser;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java
new file mode 100644
index 0000000..c3c75f7
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.remote;
+
+import org.apache.tinkerpop.gremlin.driver.Client;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.Tokens;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.AbstractRemoteTraversalSideEffects;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public class DriverRemoteTraversalSideEffects extends AbstractRemoteTraversalSideEffects {
+
+    private final Client client;
+    private Set<String> keys = null;
+    private final UUID serverSideEffect;
+
+    private final Map<String, Object> sideEffects = new HashMap<>();
+
+    public DriverRemoteTraversalSideEffects(final Client client, final UUID serverSideEffect) {
+        this.client = client;
+        this.serverSideEffect = serverSideEffect;
+    }
+
+    @Override
+    public <V> V get(final String key) throws IllegalArgumentException {
+        if (!sideEffects.containsKey(key)) {
+            final RequestMessage msg = RequestMessage.build(Tokens.OPS_GATHER)
+                    .addArg(Tokens.ARGS_SIDE_EFFECT, serverSideEffect)
+                    .addArg(Tokens.ARGS_SIDE_EFFECT_KEY, key)
+                    .processor("traversal").create();
+            try {
+                final Result result = client.submitAsync(msg).get().one();
+                sideEffects.put(key, null == result ? null : result.getObject());
+            } catch (Exception ex) {
+                throw new RuntimeException("Could not get cache value", ex);
+            }
+        }
+
+        return (V) sideEffects.get(key);
+    }
+
+    @Override
+    public Set<String> keys() {
+        if (null == keys) {
+            final RequestMessage msg = RequestMessage.build(Tokens.OPS_KEYS)
+                    .addArg(Tokens.ARGS_SIDE_EFFECT, serverSideEffect)
+                    .processor("traversal").create();
+            try {
+                keys = client.submitAsync(msg).get().all().get().stream().map(r -> r.getString()).collect(Collectors.toSet());
+            } catch (Exception ex) {
+                throw new RuntimeException("Could not get keys", ex);
+            }
+        }
+
+        return keys;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverTraversalSideEffects.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverTraversalSideEffects.java
deleted file mode 100644
index 6996df4..0000000
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverTraversalSideEffects.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.driver.remote;
-
-import org.apache.tinkerpop.gremlin.driver.ResultSet;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
-import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.BinaryOperator;
-import java.util.function.Supplier;
-import java.util.function.UnaryOperator;
-
-/**
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-public class DriverTraversalSideEffects implements TraversalSideEffects {
-
-    private final ResultSet rs;
-    private CompletableFuture<Map<String, Object>> future = null;
-
-    public DriverTraversalSideEffects(final ResultSet rs) {
-        this.rs = rs;
-    }
-
-    @Override
-    public <V> V get(final String key) throws IllegalArgumentException {
-        initializeFuture();
-        try {
-            return (V) future.get().get(key);
-        } catch (Exception ex) {
-            // TODO: PREEEEEEEEEEETTTTTY DUMPY
-            throw new RuntimeException(ex);
-        }
-    }
-
-    @Override
-    public void set(String key, Object value) throws IllegalArgumentException {
-
-    }
-
-    @Override
-    public void remove(String key) {
-
-    }
-
-    @Override
-    public Set<String> keys() {
-        initializeFuture();
-        try {
-            return future.get().keySet();
-        } catch (Exception ex) {
-            // TODO: PREEEEEEEEEEETTTTTY DUMPY
-            throw new RuntimeException(ex);
-        }
-    }
-
-    @Override
-    public <V> void register(String key, Supplier<V> initialValue, BinaryOperator<V> reducer) {
-
-    }
-
-    @Override
-    public <V> void registerIfAbsent(String key, Supplier<V> initialValue, BinaryOperator<V> reducer) {
-
-    }
-
-    @Override
-    public <V> BinaryOperator<V> getReducer(String key) throws IllegalArgumentException {
-        return null;
-    }
-
-    @Override
-    public <V> Supplier<V> getSupplier(String key) throws IllegalArgumentException {
-        return null;
-    }
-
-    @Override
-    public void add(String key, Object value) throws IllegalArgumentException {
-
-    }
-
-    @Override
-    public <S> void setSack(Supplier<S> initialValue, UnaryOperator<S> splitOperator, BinaryOperator<S> mergeOperator) {
-
-    }
-
-    @Override
-    public <S> Supplier<S> getSackInitialValue() {
-        return null;
-    }
-
-    @Override
-    public <S> UnaryOperator<S> getSackSplitter() {
-        return null;
-    }
-
-    @Override
-    public <S> BinaryOperator<S> getSackMerger() {
-        return null;
-    }
-
-    @Override
-    public TraversalSideEffects clone() {
-        return null;
-    }
-
-    @Override
-    public void mergeInto(TraversalSideEffects sideEffects) {
-
-    }
-
-    @Override
-    public void registerSupplier(String key, Supplier supplier) {
-
-    }
-
-    @Override
-    public <V> Optional<Supplier<V>> getRegisteredSupplier(String key) {
-        return null;
-    }
-
-    @Override
-    public String toString() {
-        return StringFactory.traversalSideEffectsString(this);
-    }
-
-    private synchronized void initializeFuture() {
-        if (null == future) future = rs.getSideEffectResults();
-    }
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java
index 7523e79..40adc27 100644
--- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java
@@ -296,85 +296,54 @@ public class ResultQueueTest extends AbstractResultQueueTest {
     }
 
     @Test
-    public void shouldHandleBulkSetSideEffects() {
-        assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
+    public void shouldHandleBulkSetSideEffects() throws Exception  {
+        final CompletableFuture<List<Result>> o = resultQueue.await(1);
+        assertThat(o.isDone(), is(false));
 
-        resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_BULKSET, new RemoteTraverser<>("stephen", 1));
-        assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
-        assertEquals(1, ((BulkSet) resultQueue.getSideEffect("a")).get("stephen"));
+        resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_BULKSET, new RemoteTraverser<>("brian", 2));
+        assertThat(o.isDone(), is(false));
 
-        resultQueue.addSideEffect("b", Tokens.VAL_AGGREGATE_TO_BULKSET, new RemoteTraverser<>("brian", 2));
-        assertThat(resultQueue.getSideEffectKeys(), hasItem("b"));
-        assertEquals(2, ((BulkSet) resultQueue.getSideEffect("b")).get("brian"));
+        resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_BULKSET, new RemoteTraverser<>("brian", 2));
+        assertThat(o.isDone(), is(false));
 
-        resultQueue.addSideEffect("b", Tokens.VAL_AGGREGATE_TO_BULKSET, new RemoteTraverser<>("brian", 2));
-        assertThat(resultQueue.getSideEffectKeys(), hasItem("b"));
-        assertEquals(4, ((BulkSet) resultQueue.getSideEffect("b")).get("brian"));
+        resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_BULKSET, new RemoteTraverser<>("belinda", 6));
+        assertThat(o.isDone(), is(false));
 
-        resultQueue.addSideEffect("b", Tokens.VAL_AGGREGATE_TO_BULKSET, new RemoteTraverser<>("belinda", 6));
-        assertThat(resultQueue.getSideEffectKeys(), hasItem("b"));
-        assertEquals(6, ((BulkSet) resultQueue.getSideEffect("b")).get("belinda"));
+        resultQueue.markComplete();
 
+        assertThat(o.isDone(), is(true));
+        final BulkSet<String> bulkSet = o.get().get(0).get(BulkSet.class);
+        assertEquals(4, bulkSet.get("brian"));
+        assertEquals(6, bulkSet.get("belinda"));
     }
 
     @Test
-    public void shouldNotMixAggregatesForBulkSet() {
-        assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
+    public void shouldHandleListSideEffects() throws Exception {
+        final CompletableFuture<List<Result>> o = resultQueue.await(1);
+        assertThat(o.isDone(), is(false));
 
-        resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_BULKSET, new RemoteTraverser<>("stephen", 1));
-        assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
-        assertEquals(1, ((BulkSet) resultQueue.getSideEffect("a")).get("stephen"));
+        resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_LIST, "stephen");
+        assertThat(o.isDone(), is(false));
 
-        try {
-            resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_BULKSET, Arrays.asList("stephen", "kathy", "alice"));
-        } catch (Exception ex) {
-            assertThat(ex, instanceOf(IllegalStateException.class));
-            assertEquals("Side-effect \"a\" value [stephen, kathy, alice] is a ArrayList which does not aggregate to bulkset", ex.getMessage());
-        }
-    }
-
-    @Test
-    public void shouldHandleListSideEffects() {
-        assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
-
-        resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_LIST, "stephen");
-        assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
-        List<String> l = resultQueue.getSideEffect("a");
-        assertEquals(1, l.size());
-        assertEquals("stephen", l.get(0));
-
-        resultQueue.addSideEffect("d", Tokens.VAL_AGGREGATE_TO_LIST, "daniel");
-        assertThat(resultQueue.getSideEffectKeys(), hasItem("d"));
-        l = resultQueue.getSideEffect("d");
-        assertEquals(1, l.size());
-        assertEquals("daniel", l.get(0));
-
-        resultQueue.addSideEffect("d", Tokens.VAL_AGGREGATE_TO_LIST, "dave");
-        assertThat(resultQueue.getSideEffectKeys(), hasItem("d"));
-        l = resultQueue.getSideEffect("d");
-        assertEquals(2, l.size());
-        assertThat(l, contains("daniel","dave"));
-    }
+        resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_LIST, "daniel");
+        assertThat(o.isDone(), is(false));
 
-    @Test
-    public void shouldNotMixAggregatesForList() {
-        assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
+        resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_LIST, "dave");
+        assertThat(o.isDone(), is(false));
 
-        resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_BULKSET, new RemoteTraverser<>("stephen", 1));
-        assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
-        assertEquals(1, ((BulkSet) resultQueue.getSideEffect("a")).get("stephen"));
+        resultQueue.markComplete();
 
-        try {
-            resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_LIST, Arrays.asList("stephen", "kathy", "alice"));
-        } catch (Exception ex) {
-            assertThat(ex, instanceOf(IllegalStateException.class));
-            assertEquals("Side-effect \"a\" contains the type BulkSet that is not acceptable for list", ex.getMessage());
-        }
+        assertThat(o.isDone(), is(true));
+        final List<String> list = o.get().get(0).get(ArrayList.class);
+        assertEquals("stephen", list.get(0));
+        assertEquals("daniel", list.get(1));
+        assertEquals("dave", list.get(2));
     }
 
     @Test
-    public void shouldHandleMapSideEffects() {
-        assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
+    public void shouldHandleMapSideEffects() throws Exception {
+        final CompletableFuture<List<Result>> o = resultQueue.await(1);
+        assertThat(o.isDone(), is(false));
 
         final Map<String,String> m = new HashMap<>();
         m.put("s", "stephen");
@@ -382,47 +351,38 @@ public class ResultQueueTest extends AbstractResultQueueTest {
         m.put("d", "daniel");
 
         m.entrySet().forEach(e -> {
-            resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_MAP, e);
-            assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
-            assertEquals(e.getValue(), ((Map) resultQueue.getSideEffect("a")).get(e.getKey()));
+            resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_MAP, e);
+            assertThat(o.isDone(), is(false));
         });
 
-        assertEquals(3, ((Map) resultQueue.getSideEffect("a")).size());
-    }
-
-    @Test
-    public void shouldNotMixAggregatesForMap() {
-        assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
-
-        final Map<String,String> m = new HashMap<>();
-        m.put("s", "stephen");
-
-        resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_MAP, m.entrySet().iterator().next());
-        assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
-        assertEquals("stephen", ((Map) resultQueue.getSideEffect("a")).get("s"));
+        resultQueue.markComplete();
 
-        try {
-            resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_MAP, Arrays.asList("stephen", "kathy", "alice"));
-        } catch (Exception ex) {
-            assertThat(ex, instanceOf(IllegalStateException.class));
-            assertEquals("Side-effect \"a\" value [stephen, kathy, alice] is a ArrayList which does not aggregate to map", ex.getMessage());
-        }
+        assertThat(o.isDone(), is(true));
+        final Map<String, String> list = o.get().get(0).get(HashMap.class);
+        assertEquals("stephen", list.get("s"));
+        assertEquals("daniel", list.get("d"));
+        assertEquals("marko", list.get("m"));
     }
 
+
     @Test
-    public void shouldHandleNotAggregateSideEffects() {
-        assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
+    public void shouldHandleNotAggregateSideEffects() throws Exception  {
+        final CompletableFuture<List<Result>> o = resultQueue.await(1);
+        assertThat(o.isDone(), is(false));
 
         final Map<String,String> m = new HashMap<>();
         m.put("s", "stephen");
         m.put("m", "marko");
         m.put("d", "daniel");
 
-        resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_NONE, m);
-        assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
-        assertEquals("stephen", ((Map) resultQueue.getSideEffect("a")).get("s"));
-        assertEquals("marko", ((Map) resultQueue.getSideEffect("a")).get("m"));
-        assertEquals("daniel", ((Map) resultQueue.getSideEffect("a")).get("d"));
-        assertEquals(3, ((Map) resultQueue.getSideEffect("a")).size());
+        resultQueue.addSideEffect(Tokens.VAL_AGGREGATE_TO_NONE, m);
+
+        resultQueue.markComplete();
+
+        assertThat(o.isDone(), is(true));
+        final Map<String, String> list = o.get().get(0).get(HashMap.class);
+        assertEquals("stephen", list.get("s"));
+        assertEquals("daniel", list.get("d"));
+        assertEquals("marko", list.get("m"));
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
index f5588ab..768ecc1 100644
--- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -44,7 +45,7 @@ public class ResultSetTest extends AbstractResultQueueTest {
 
     @Before
     public void setupThis() {
-        resultSet = new ResultSet(resultQueue, pool, readCompleted);
+        resultSet = new ResultSet(resultQueue, pool, readCompleted, RequestMessage.build("traversal").create());
     }
 
     @Test
@@ -194,30 +195,4 @@ public class ResultSetTest extends AbstractResultQueueTest {
 
         assertEquals(100, counter.get());
     }
-
-    @Test
-    public void shouldRetrieveSideEffects() throws Exception {
-        final Iterator itty = resultSet.iterator();
-        final CompletableFuture<Map<String,Object>> sideEffects = resultSet.getSideEffectResults();
-
-        assertThat(sideEffects.isDone(), is(false));
-
-        // queue is not marked finished so the side effect future is still not complete
-        addToQueue(100, 1, true, false);
-
-        for (int i = 0; i < 101; i++) {
-            assertThat(itty.hasNext(), is(true));
-        }
-
-        // now complete the queue
-        addToQueue(0, 1, true, true, 0);
-
-        // addToQueue doesn't block for "read complete" so gotta spin the thread
-        while (!readCompleted.isDone()) {
-            Thread.sleep(10);
-        }
-
-        // side effects are empty in this case, but that's fine for the purpose of this test
-        assertThat(sideEffects.isDone(), is(true));
-    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a6ab71be/gremlin-server/pom.xml
----------------------------------------------------------------------
diff --git a/gremlin-server/pom.xml b/gremlin-server/pom.xml
index 2e7fc35..971baca 100644
--- a/gremlin-server/pom.xml
+++ b/gremlin-server/pom.xml
@@ -45,6 +45,11 @@ limitations under the License.
             <groupId>commons-collections</groupId>
             <artifactId>commons-collections</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>2.3.1</version>
+        </dependency>
         <!-- METRICS -->
         <dependency>
             <groupId>com.codahale.metrics</groupId>