You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2018/08/16 15:40:34 UTC

[46/50] tinkerpop git commit: TINKERPOP-1913 Made status attributes available to the ResultSet

TINKERPOP-1913 Made status attributes available to the ResultSet


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/a9ab8219
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/a9ab8219
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/a9ab8219

Branch: refs/heads/TINKERPOP-1913
Commit: a9ab8219ad6462ed5c2292802166d793df0e6aec
Parents: 1250129
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Wed Mar 7 10:29:11 2018 -0500
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Thu Aug 16 09:19:14 2018 -0400

----------------------------------------------------------------------
 .../tinkerpop/gremlin/driver/Handler.java       | 13 ++++-
 .../tinkerpop/gremlin/driver/ResultQueue.java   |  2 +
 .../tinkerpop/gremlin/driver/ResultSet.java     | 12 +++++
 .../tinkerpop/gremlin/driver/ResultSetTest.java | 25 +++++++++
 .../gremlin/server/op/AbstractOpProcessor.java  | 56 ++++++++++++++++----
 .../op/traversal/TraversalOpProcessor.java      | 10 +++-
 .../server/GremlinResultSetIntegrateTest.java   |  9 ++++
 7 files changed, 115 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a9ab8219/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
index 68e1631..4677bee 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
@@ -259,9 +259,18 @@ final class Handler {
                     }
                 }
 
-                // as this is a non-PARTIAL_CONTENT code - the stream is done
-                if (response.getStatus().getCode() != ResponseStatusCode.PARTIAL_CONTENT)
+                // the last message in a OK stream could have meta-data that is useful to the result. note that error
+                // handling of the status attributes is handled separately above
+                if (statusCode == ResponseStatusCode.SUCCESS || statusCode == ResponseStatusCode.NO_CONTENT) {
+                    // in 3.4.0 this should get refactored. i think the that the markComplete() could just take the
+                    // status attributes as its argument - need to investigate that further
+                    queue.statusAttributes = response.getStatus().getAttributes();
+                }
+
+                // as this is a non-PARTIAL_CONTENT code - the stream is done.
+                if (statusCode != ResponseStatusCode.PARTIAL_CONTENT) {
                     pending.remove(response.getRequestId()).markComplete();
+                }
             } finally {
                 // in the event of an exception above the exception is tossed and handled by whatever channelpipeline
                 // error handling is at play.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a9ab8219/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
index e21e265..7340763 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
@@ -56,6 +56,8 @@ final class ResultQueue {
 
     private final Queue<Pair<CompletableFuture<List<Result>>,Integer>> waiting = new ConcurrentLinkedQueue<>();
 
+    Map<String,Object> statusAttributes = null;
+
     public ResultQueue(final LinkedBlockingQueue<Result> resultLinkedBlockingQueue, final CompletableFuture<Void> readComplete) {
         this.resultLinkedBlockingQueue = resultLinkedBlockingQueue;
         this.readComplete = readComplete;

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a9ab8219/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
index f82876c..f608f06 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
@@ -21,8 +21,10 @@ package org.apache.tinkerpop.gremlin.driver;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Spliterator;
 import java.util.Spliterators;
@@ -74,6 +76,16 @@ public final class ResultSet implements Iterable<Result> {
     }
 
     /**
+     * Returns a future that will complete when {@link #allItemsAvailable()} is {@code true} and will contain the
+     * attributes from the response.
+     */
+    public CompletableFuture<Map<String,Object>> statusAttributes() {
+        final CompletableFuture<Map<String,Object>> attrs = new CompletableFuture<>();
+        readCompleted.thenRun(() -> attrs.complete(null == resultQueue.statusAttributes ? Collections.emptyMap() : resultQueue.statusAttributes));
+        return attrs;
+    }
+
+    /**
      * Determines if all items have been returned to the client.
      */
     public boolean allItemsAvailable() {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a9ab8219/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
index 0cf4fb5..3163ffe 100644
--- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
@@ -22,8 +22,10 @@ import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -49,6 +51,29 @@ public class ResultSetTest extends AbstractResultQueueTest {
     }
 
     @Test
+    public void shouldReturnResponseAttributes() throws Exception {
+        resultQueue.statusAttributes = new HashMap<String,Object>() {{
+            put("test",123);
+            put("junk","here");
+        }};
+
+        final CompletableFuture<Map<String,Object>> attrs = resultSet.statusAttributes();
+        readCompleted.complete(null);
+
+        final Map<String,Object> m = attrs.get();
+        assertEquals(123, m.get("test"));
+        assertEquals("here", m.get("junk"));
+        assertEquals(2, m.size());
+    }
+
+    @Test
+    public void shouldReturnEmptyMapForNoResponseAttributes() throws Exception {
+        final CompletableFuture<Map<String,Object>> attrs = resultSet.statusAttributes();
+        readCompleted.complete(null);
+        assertThat(attrs.get().isEmpty(), is(true));
+    }
+
+    @Test
     public void shouldHaveAllItemsAvailableAsynchronouslyOnReadComplete() {
         final CompletableFuture<Void> all = resultSet.allItemsAvailableAsync();
         assertThat(all.isDone(), is(false));

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a9ab8219/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
index 331b762..767445a 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -67,7 +68,6 @@ public abstract class AbstractOpProcessor implements OpProcessor {
      *
      * @param context The Gremlin Server {@link Context} object containing settings, request message, etc.
      * @param itty The result to iterator
-     * @throws TimeoutException if the time taken to serialize the entire result set exceeds the allowable time.
      * @see #handleIterator(ResponseHandlerContext, Iterator)
      */
     protected void handleIterator(final Context context, final Iterator itty) throws InterruptedException {
@@ -146,7 +146,9 @@ public abstract class AbstractOpProcessor implements OpProcessor {
                     // thread that processed the eval of the script so, we have to push serialization down into that
                     Frame frame = null;
                     try {
-                        frame = makeFrame(rhc, msg, serializer, useBinary, aggregate, code, generateMetaData(ctx, msg, code, itty));
+                        frame = makeFrame(rhc, msg, serializer, useBinary, aggregate, code,
+                                generateResultMetaData(ctx, msg, code, itty, settings),
+                                generateStatusAttributes(ctx, msg, code, itty, settings));
                     } catch (Exception ex) {
                         // a frame may use a Bytebuf which is a countable release - if it does not get written
                         // downstream it needs to be released here
@@ -231,36 +233,71 @@ public abstract class AbstractOpProcessor implements OpProcessor {
     }
 
     /**
-     * Generates meta-data to put on a {@link ResponseMessage}.
+     * Generates response result meta-data to put on a {@link ResponseMessage}.
      *
      * @param itty a reference to the current {@link Iterator} of results - it is not meant to be forwarded in
      *             this method
+     * @deprecated As of release 3.3.2, replaced by {@link #generateResultMetaData(ChannelHandlerContext, RequestMessage, ResponseStatusCode, Iterator, Settings)}
      */
-    protected Map<String,Object> generateMetaData(final ChannelHandlerContext ctx, final RequestMessage msg,
-                                                  final ResponseStatusCode code, final Iterator itty) {
+    @Deprecated
+    protected Map<String, Object> generateMetaData(final ChannelHandlerContext ctx, final RequestMessage msg,
+                                                   final ResponseStatusCode code, final Iterator itty) {
         return Collections.emptyMap();
     }
 
     /**
-     * Caution: {@link #makeFrame(ResponseHandlerContext, RequestMessage, MessageSerializer, boolean, List, ResponseStatusCode, Map)}
+     * Generates response result meta-data to put on a {@link ResponseMessage}.
+     *
+     * @param itty a reference to the current {@link Iterator} of results - it is not meant to be forwarded in
+     *             this method
+     */
+    protected Map<String, Object> generateResultMetaData(final ChannelHandlerContext ctx, final RequestMessage msg,
+                                                         final ResponseStatusCode code, final Iterator itty,
+                                                         final Settings settings) {
+        return generateMetaData(ctx, msg, code, itty);
+    }
+
+    /**
+     * Generates response status meta-data to put on a {@link ResponseMessage}.
+     *
+     * @param itty a reference to the current {@link Iterator} of results - it is not meant to be forwarded in
+     *             this method
+     */
+    protected Map<String, Object> generateStatusAttributes(final ChannelHandlerContext ctx, final RequestMessage msg,
+                                                           final ResponseStatusCode code, final Iterator itty,
+                                                           final Settings settings) {
+        // only return server metadata on the last message
+        if (itty.hasNext()) return Collections.emptyMap();
+
+        final Map<String, Object> metaData = new HashMap<>();
+        metaData.put(Tokens.ARGS_HOST, ctx.channel().remoteAddress().toString());
+
+        return metaData;
+    }
+
+    /**
+     * Caution: {@link #makeFrame(ResponseHandlerContext, RequestMessage, MessageSerializer, boolean, List, ResponseStatusCode, Map, Map)}
      * should be used instead of this method whenever a {@link ResponseHandlerContext} is available.
      */
     protected static Frame makeFrame(final ChannelHandlerContext ctx, final RequestMessage msg,
                                      final MessageSerializer serializer, final boolean useBinary, final List<Object> aggregate,
-                                     final ResponseStatusCode code, final Map<String,Object> responseMetaData) throws Exception {
+                                     final ResponseStatusCode code, final Map<String,Object> responseMetaData,
+                                     final Map<String,Object> statusAttributes) throws Exception {
         final Context context = new Context(msg, ctx, null, null, null, null); // dummy context, good only for writing response messages to the channel
         final ResponseHandlerContext rhc = new ResponseHandlerContext(context);
-        return makeFrame(rhc, msg, serializer, useBinary, aggregate, code, responseMetaData);
+        return makeFrame(rhc, msg, serializer, useBinary, aggregate, code, responseMetaData, statusAttributes);
     }
 
     protected static Frame makeFrame(final ResponseHandlerContext rhc, final RequestMessage msg,
                                      final MessageSerializer serializer, final boolean useBinary, final List<Object> aggregate,
-                                     final ResponseStatusCode code, final Map<String,Object> responseMetaData) throws Exception {
+                                     final ResponseStatusCode code, final Map<String,Object> responseMetaData,
+                                     final Map<String,Object> statusAttributes) throws Exception {
         final ChannelHandlerContext ctx = rhc.getContext().getChannelHandlerContext();
         try {
             if (useBinary) {
                 return new Frame(serializer.serializeResponseAsBinary(ResponseMessage.build(msg)
                         .code(code)
+                        .statusAttributes(statusAttributes)
                         .responseMetaData(responseMetaData)
                         .result(aggregate).create(), ctx.alloc()));
             } else {
@@ -269,6 +306,7 @@ public abstract class AbstractOpProcessor implements OpProcessor {
                 final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
                 return new Frame(textSerializer.serializeResponseAsString(ResponseMessage.build(msg)
                         .code(code)
+                        .statusAttributes(statusAttributes)
                         .responseMetaData(responseMetaData)
                         .result(aggregate).create()));
             }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a9ab8219/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
index ca035c7..e5383ac 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
@@ -449,6 +449,9 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
     @Override
     protected Map<String, Object> generateMetaData(final ChannelHandlerContext ctx, final RequestMessage msg,
                                                    final ResponseStatusCode code, final Iterator itty) {
+        // leaving this overriding the deprecated version of this method because it provides a decent test to those
+        // who might have their own OpProcessor implementations that apply meta-data. leaving this alone helps validate
+        // that the upgrade path is clean. we can remove this in 3.4.0
         Map<String, Object> metaData = Collections.emptyMap();
         if (itty instanceof SideEffectIterator) {
             final SideEffectIterator traversalIterator = (SideEffectIterator) itty;
@@ -458,6 +461,9 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
                 metaData.put(Tokens.ARGS_SIDE_EFFECT_KEY, key);
                 metaData.put(Tokens.ARGS_AGGREGATE_TO, traversalIterator.getSideEffectAggregator());
             }
+        } else {
+            // this is a standard traversal iterator
+            metaData = super.generateMetaData(ctx, msg, code, itty);
         }
 
         return metaData;
@@ -530,7 +536,9 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
                     // thread that processed the eval of the script so, we have to push serialization down into that
                     Frame frame = null;
                     try {
-                        frame = makeFrame(ctx, msg, serializer, useBinary, aggregate, code, generateMetaData(ctx, msg, code, itty));
+                        frame = makeFrame(ctx, msg, serializer, useBinary, aggregate, code,
+                                generateResultMetaData(ctx, msg, code, itty, settings),
+                                generateStatusAttributes(ctx, msg, code, itty, settings));
                     } catch (Exception ex) {
                         // a frame may use a Bytebuf which is a countable release - if it does not get written
                         // downstream it needs to be released here

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a9ab8219/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinResultSetIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinResultSetIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinResultSetIntegrateTest.java
index 66e2c94..bd71f1b 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinResultSetIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinResultSetIntegrateTest.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.driver.Cluster;
 import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
 import org.apache.tinkerpop.gremlin.driver.Result;
 import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.Tokens;
 import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0;
 import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
 import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin;
@@ -53,6 +54,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.is;
@@ -88,6 +90,13 @@ public class GremlinResultSetIntegrateTest extends AbstractGremlinServerIntegrat
     }
 
     @Test
+    public void shouldReturnResponseAttributes() throws Exception {
+        final ResultSet results = client.submit("g.V()");
+        final Map<String,Object> attr = results.statusAttributes().get(20000, TimeUnit.MILLISECONDS);
+        assertThat(attr.containsKey(Tokens.ARGS_HOST), is(true));
+    }
+
+    @Test
     public void shouldHandleVertexResultFromTraversalBulked() throws Exception {
         final Graph graph = TinkerGraph.open();
         final GraphTraversalSource g = graph.traversal();