You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2018/08/16 15:40:34 UTC
[46/50] tinkerpop git commit: TINKERPOP-1913 Made status attributes
available to the ResultSet
TINKERPOP-1913 Made status attributes available to the ResultSet
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/a9ab8219
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/a9ab8219
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/a9ab8219
Branch: refs/heads/TINKERPOP-1913
Commit: a9ab8219ad6462ed5c2292802166d793df0e6aec
Parents: 1250129
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Wed Mar 7 10:29:11 2018 -0500
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Thu Aug 16 09:19:14 2018 -0400
----------------------------------------------------------------------
.../tinkerpop/gremlin/driver/Handler.java | 13 ++++-
.../tinkerpop/gremlin/driver/ResultQueue.java | 2 +
.../tinkerpop/gremlin/driver/ResultSet.java | 12 +++++
.../tinkerpop/gremlin/driver/ResultSetTest.java | 25 +++++++++
.../gremlin/server/op/AbstractOpProcessor.java | 56 ++++++++++++++++----
.../op/traversal/TraversalOpProcessor.java | 10 +++-
.../server/GremlinResultSetIntegrateTest.java | 9 ++++
7 files changed, 115 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a9ab8219/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
index 68e1631..4677bee 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
@@ -259,9 +259,18 @@ final class Handler {
}
}
- // as this is a non-PARTIAL_CONTENT code - the stream is done
- if (response.getStatus().getCode() != ResponseStatusCode.PARTIAL_CONTENT)
+ // the last message in a OK stream could have meta-data that is useful to the result. note that error
+ // handling of the status attributes is handled separately above
+ if (statusCode == ResponseStatusCode.SUCCESS || statusCode == ResponseStatusCode.NO_CONTENT) {
+ // in 3.4.0 this should get refactored. i think the that the markComplete() could just take the
+ // status attributes as its argument - need to investigate that further
+ queue.statusAttributes = response.getStatus().getAttributes();
+ }
+
+ // as this is a non-PARTIAL_CONTENT code - the stream is done.
+ if (statusCode != ResponseStatusCode.PARTIAL_CONTENT) {
pending.remove(response.getRequestId()).markComplete();
+ }
} finally {
// in the event of an exception above the exception is tossed and handled by whatever channelpipeline
// error handling is at play.
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a9ab8219/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
index e21e265..7340763 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
@@ -56,6 +56,8 @@ final class ResultQueue {
private final Queue<Pair<CompletableFuture<List<Result>>,Integer>> waiting = new ConcurrentLinkedQueue<>();
+ Map<String,Object> statusAttributes = null;
+
public ResultQueue(final LinkedBlockingQueue<Result> resultLinkedBlockingQueue, final CompletableFuture<Void> readComplete) {
this.resultLinkedBlockingQueue = resultLinkedBlockingQueue;
this.readComplete = readComplete;
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a9ab8219/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
index f82876c..f608f06 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
@@ -21,8 +21,10 @@ package org.apache.tinkerpop.gremlin.driver;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Spliterator;
import java.util.Spliterators;
@@ -74,6 +76,16 @@ public final class ResultSet implements Iterable<Result> {
}
/**
+ * Returns a future that will complete when {@link #allItemsAvailable()} is {@code true} and will contain the
+ * attributes from the response.
+ */
+ public CompletableFuture<Map<String,Object>> statusAttributes() {
+ final CompletableFuture<Map<String,Object>> attrs = new CompletableFuture<>();
+ readCompleted.thenRun(() -> attrs.complete(null == resultQueue.statusAttributes ? Collections.emptyMap() : resultQueue.statusAttributes));
+ return attrs;
+ }
+
+ /**
* Determines if all items have been returned to the client.
*/
public boolean allItemsAvailable() {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a9ab8219/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
index 0cf4fb5..3163ffe 100644
--- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
@@ -22,8 +22,10 @@ import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.junit.Before;
import org.junit.Test;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -49,6 +51,29 @@ public class ResultSetTest extends AbstractResultQueueTest {
}
@Test
+ public void shouldReturnResponseAttributes() throws Exception {
+ resultQueue.statusAttributes = new HashMap<String,Object>() {{
+ put("test",123);
+ put("junk","here");
+ }};
+
+ final CompletableFuture<Map<String,Object>> attrs = resultSet.statusAttributes();
+ readCompleted.complete(null);
+
+ final Map<String,Object> m = attrs.get();
+ assertEquals(123, m.get("test"));
+ assertEquals("here", m.get("junk"));
+ assertEquals(2, m.size());
+ }
+
+ @Test
+ public void shouldReturnEmptyMapForNoResponseAttributes() throws Exception {
+ final CompletableFuture<Map<String,Object>> attrs = resultSet.statusAttributes();
+ readCompleted.complete(null);
+ assertThat(attrs.get().isEmpty(), is(true));
+ }
+
+ @Test
public void shouldHaveAllItemsAvailableAsynchronouslyOnReadComplete() {
final CompletableFuture<Void> all = resultSet.allItemsAvailableAsync();
assertThat(all.isDone(), is(false));
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a9ab8219/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
index 331b762..767445a 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -67,7 +68,6 @@ public abstract class AbstractOpProcessor implements OpProcessor {
*
* @param context The Gremlin Server {@link Context} object containing settings, request message, etc.
* @param itty The result to iterator
- * @throws TimeoutException if the time taken to serialize the entire result set exceeds the allowable time.
* @see #handleIterator(ResponseHandlerContext, Iterator)
*/
protected void handleIterator(final Context context, final Iterator itty) throws InterruptedException {
@@ -146,7 +146,9 @@ public abstract class AbstractOpProcessor implements OpProcessor {
// thread that processed the eval of the script so, we have to push serialization down into that
Frame frame = null;
try {
- frame = makeFrame(rhc, msg, serializer, useBinary, aggregate, code, generateMetaData(ctx, msg, code, itty));
+ frame = makeFrame(rhc, msg, serializer, useBinary, aggregate, code,
+ generateResultMetaData(ctx, msg, code, itty, settings),
+ generateStatusAttributes(ctx, msg, code, itty, settings));
} catch (Exception ex) {
// a frame may use a Bytebuf which is a countable release - if it does not get written
// downstream it needs to be released here
@@ -231,36 +233,71 @@ public abstract class AbstractOpProcessor implements OpProcessor {
}
/**
- * Generates meta-data to put on a {@link ResponseMessage}.
+ * Generates response result meta-data to put on a {@link ResponseMessage}.
*
* @param itty a reference to the current {@link Iterator} of results - it is not meant to be forwarded in
* this method
+ * @deprecated As of release 3.3.2, replaced by {@link #generateResultMetaData(ChannelHandlerContext, RequestMessage, ResponseStatusCode, Iterator, Settings)}
*/
- protected Map<String,Object> generateMetaData(final ChannelHandlerContext ctx, final RequestMessage msg,
- final ResponseStatusCode code, final Iterator itty) {
+ @Deprecated
+ protected Map<String, Object> generateMetaData(final ChannelHandlerContext ctx, final RequestMessage msg,
+ final ResponseStatusCode code, final Iterator itty) {
return Collections.emptyMap();
}
/**
- * Caution: {@link #makeFrame(ResponseHandlerContext, RequestMessage, MessageSerializer, boolean, List, ResponseStatusCode, Map)}
+ * Generates response result meta-data to put on a {@link ResponseMessage}.
+ *
+ * @param itty a reference to the current {@link Iterator} of results - it is not meant to be forwarded in
+ * this method
+ */
+ protected Map<String, Object> generateResultMetaData(final ChannelHandlerContext ctx, final RequestMessage msg,
+ final ResponseStatusCode code, final Iterator itty,
+ final Settings settings) {
+ return generateMetaData(ctx, msg, code, itty);
+ }
+
+ /**
+ * Generates response status meta-data to put on a {@link ResponseMessage}.
+ *
+ * @param itty a reference to the current {@link Iterator} of results - it is not meant to be forwarded in
+ * this method
+ */
+ protected Map<String, Object> generateStatusAttributes(final ChannelHandlerContext ctx, final RequestMessage msg,
+ final ResponseStatusCode code, final Iterator itty,
+ final Settings settings) {
+ // only return server metadata on the last message
+ if (itty.hasNext()) return Collections.emptyMap();
+
+ final Map<String, Object> metaData = new HashMap<>();
+ metaData.put(Tokens.ARGS_HOST, ctx.channel().remoteAddress().toString());
+
+ return metaData;
+ }
+
+ /**
+ * Caution: {@link #makeFrame(ResponseHandlerContext, RequestMessage, MessageSerializer, boolean, List, ResponseStatusCode, Map, Map)}
* should be used instead of this method whenever a {@link ResponseHandlerContext} is available.
*/
protected static Frame makeFrame(final ChannelHandlerContext ctx, final RequestMessage msg,
final MessageSerializer serializer, final boolean useBinary, final List<Object> aggregate,
- final ResponseStatusCode code, final Map<String,Object> responseMetaData) throws Exception {
+ final ResponseStatusCode code, final Map<String,Object> responseMetaData,
+ final Map<String,Object> statusAttributes) throws Exception {
final Context context = new Context(msg, ctx, null, null, null, null); // dummy context, good only for writing response messages to the channel
final ResponseHandlerContext rhc = new ResponseHandlerContext(context);
- return makeFrame(rhc, msg, serializer, useBinary, aggregate, code, responseMetaData);
+ return makeFrame(rhc, msg, serializer, useBinary, aggregate, code, responseMetaData, statusAttributes);
}
protected static Frame makeFrame(final ResponseHandlerContext rhc, final RequestMessage msg,
final MessageSerializer serializer, final boolean useBinary, final List<Object> aggregate,
- final ResponseStatusCode code, final Map<String,Object> responseMetaData) throws Exception {
+ final ResponseStatusCode code, final Map<String,Object> responseMetaData,
+ final Map<String,Object> statusAttributes) throws Exception {
final ChannelHandlerContext ctx = rhc.getContext().getChannelHandlerContext();
try {
if (useBinary) {
return new Frame(serializer.serializeResponseAsBinary(ResponseMessage.build(msg)
.code(code)
+ .statusAttributes(statusAttributes)
.responseMetaData(responseMetaData)
.result(aggregate).create(), ctx.alloc()));
} else {
@@ -269,6 +306,7 @@ public abstract class AbstractOpProcessor implements OpProcessor {
final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
return new Frame(textSerializer.serializeResponseAsString(ResponseMessage.build(msg)
.code(code)
+ .statusAttributes(statusAttributes)
.responseMetaData(responseMetaData)
.result(aggregate).create()));
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a9ab8219/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
index ca035c7..e5383ac 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
@@ -449,6 +449,9 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
@Override
protected Map<String, Object> generateMetaData(final ChannelHandlerContext ctx, final RequestMessage msg,
final ResponseStatusCode code, final Iterator itty) {
+ // leaving this overriding the deprecated version of this method because it provides a decent test to those
+ // who might have their own OpProcessor implementations that apply meta-data. leaving this alone helps validate
+ // that the upgrade path is clean. we can remove this in 3.4.0
Map<String, Object> metaData = Collections.emptyMap();
if (itty instanceof SideEffectIterator) {
final SideEffectIterator traversalIterator = (SideEffectIterator) itty;
@@ -458,6 +461,9 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
metaData.put(Tokens.ARGS_SIDE_EFFECT_KEY, key);
metaData.put(Tokens.ARGS_AGGREGATE_TO, traversalIterator.getSideEffectAggregator());
}
+ } else {
+ // this is a standard traversal iterator
+ metaData = super.generateMetaData(ctx, msg, code, itty);
}
return metaData;
@@ -530,7 +536,9 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
// thread that processed the eval of the script so, we have to push serialization down into that
Frame frame = null;
try {
- frame = makeFrame(ctx, msg, serializer, useBinary, aggregate, code, generateMetaData(ctx, msg, code, itty));
+ frame = makeFrame(ctx, msg, serializer, useBinary, aggregate, code,
+ generateResultMetaData(ctx, msg, code, itty, settings),
+ generateStatusAttributes(ctx, msg, code, itty, settings));
} catch (Exception ex) {
// a frame may use a Bytebuf which is a countable release - if it does not get written
// downstream it needs to be released here
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a9ab8219/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinResultSetIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinResultSetIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinResultSetIntegrateTest.java
index 66e2c94..bd71f1b 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinResultSetIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinResultSetIntegrateTest.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.driver.Cluster;
import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
import org.apache.tinkerpop.gremlin.driver.Result;
import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.Tokens;
import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0;
import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin;
@@ -53,6 +54,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.is;
@@ -88,6 +90,13 @@ public class GremlinResultSetIntegrateTest extends AbstractGremlinServerIntegrat
}
@Test
+ public void shouldReturnResponseAttributes() throws Exception {
+ final ResultSet results = client.submit("g.V()");
+ final Map<String,Object> attr = results.statusAttributes().get(20000, TimeUnit.MILLISECONDS);
+ assertThat(attr.containsKey(Tokens.ARGS_HOST), is(true));
+ }
+
+ @Test
public void shouldHandleVertexResultFromTraversalBulked() throws Exception {
final Graph graph = TinkerGraph.open();
final GraphTraversalSource g = graph.traversal();