You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/06/26 00:24:49 UTC
incubator-tinkerpop git commit: Use read complete future instead of
polling in ResultSet.all() TINKERPOP3-734
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/resultset-refactor [created] 724f619aa
Use read complete future instead of polling in ResultSet.all() TINKERPOP3-734
This change had the additional side-effect of bubbling up some more detailed exception messages to the client. Improvements to javadoc.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/724f619a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/724f619a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/724f619a
Branch: refs/heads/resultset-refactor
Commit: 724f619aa74892a29c875e28f8b3936ec54619d9
Parents: d3a4b56
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Thu Jun 25 18:21:59 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Thu Jun 25 18:21:59 2015 -0400
----------------------------------------------------------------------
.../tinkerpop/gremlin/driver/Connection.java | 3 ++-
.../tinkerpop/gremlin/driver/Handler.java | 5 ++++
.../tinkerpop/gremlin/driver/ResultQueue.java | 6 +++++
.../tinkerpop/gremlin/driver/ResultSet.java | 26 +++++++++++++-------
.../driver/handler/WebSocketClientHandler.java | 1 +
.../server/GremlinDriverIntegrateTest.java | 9 +++----
.../server/GremlinServerIntegrateTest.java | 6 ++---
7 files changed, 37 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 364dade..4bd2237 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -175,13 +175,14 @@ final class Connection {
if (isClosed() && pending.isEmpty())
shutdown(closeFuture.get());
}, cluster.executor());
+
final ResultQueue handler = new ResultQueue(resultLinkedBlockingQueue, readCompleted);
pending.put(requestMessage.getRequestId(), handler);
final ResultSet resultSet = new ResultSet(handler, cluster.executor(), channel,
() -> {
pending.remove(requestMessage.getRequestId());
return null;
- });
+ }, readCompleted);
future.complete(resultSet);
}
});
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
index 1ce7f8f..fe27528 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
@@ -87,6 +87,11 @@ final class Handler {
// messages queue will not clear. wonder if there is some way to cope with that. of course, if
// there are that many failures someone would take notice and hopefully stop the client.
logger.error("Could not process the response - correct the problem and restart the driver.", cause);
+
+ // the channel is getting closed because of something pretty bad so release all the completeable
+ // futures out there
+ pending.entrySet().stream().forEach(kv -> kv.getValue().markError(cause));
+
ctx.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
index 5f1022f..6f33343 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.driver;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -81,6 +82,11 @@ final class ResultQueue {
return result;
}
+ public void drainTo(final Collection<Result> collection) {
+ if (error.get() != null) throw new RuntimeException(error.get());
+ resultLinkedBlockingQueue.drainTo(collection);
+ }
+
public Status getStatus() {
return status;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
index 9f987bc..74ce9b2 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
@@ -37,6 +37,11 @@ import java.util.stream.StreamSupport;
* therefore may not be available immediately. As such, {@code ResultSet} provides access to a a number
* of functions that help to work with the asynchronous nature of the data streaming back. Data from results
* is stored in an {@link Result} which can be used to retrieve the item once it is on the client side.
+ * <p/>
+ * Note that a {@code ResultSet} is a forward-only stream only so depending on how the methods are called and
+ * interacted with, it is possible to return partial bits of total response (e.g. calling {@link #one()} followed
+ * by {@link #all()} will make it so that the {@link List} of results returned from {@link #all()} have one
+ * {@link Result} missing from the total set as it was already retrieved by {@link #one}.
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
@@ -46,19 +51,23 @@ public final class ResultSet implements Iterable<Result> {
private final Channel channel;
private final Supplier<Void> onChannelError;
+ private final CompletableFuture<Void> readCompleted;
+
public ResultSet(final ResultQueue resultQueue, final ExecutorService executor,
- final Channel channel, final Supplier<Void> onChannelError) {
+ final Channel channel, final Supplier<Void> onChannelError,
+ final CompletableFuture<Void> readCompleted) {
this.executor = executor;
this.resultQueue = resultQueue;
this.channel = channel;
this.onChannelError = onChannelError;
+ this.readCompleted = readCompleted;
}
/**
* Determines if all items have been returned to the client.
*/
public boolean allItemsAvailable() {
- return resultQueue.getStatus() == ResultQueue.Status.COMPLETE;
+ return readCompleted.isDone();
}
/**
@@ -110,16 +119,15 @@ public final class ResultSet implements Iterable<Result> {
}
/**
- * Wait for all items to be available on the client exhausting the stream.
+ * The returned {@link CompletableFuture} completes when all reads are complete for this request and the
+ * entire result has been accounted for on the client. While this method is named "all" it really refers to
+ * retrieving all remaining items in the set. For large result sets it is preferred to use
+ * {@link Iterator} or {@link Stream} options, as the results will be held in memory at once.
*/
public CompletableFuture<List<Result>> all() {
- return CompletableFuture.supplyAsync(() -> {
+ return readCompleted.thenApplyAsync(it -> {
final List<Result> list = new ArrayList<>();
- while (!isExhausted()) {
- final Result result = resultQueue.poll();
- if (result != null)
- list.add(result);
- }
+ resultQueue.drainTo(list);
return list;
}, executor);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 81f1e41..922775e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -98,5 +98,6 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
logger.warn("Exception caught during WebSocket processing - closing connection", cause);
if (!handshakeFuture.isDone()) handshakeFuture.setFailure(cause);
ctx.close();
+ ctx.fireExceptionCaught(cause);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index e8b8f1e..f86cf18 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -53,6 +53,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.core.StringStartsWith.startsWith;
import static org.junit.Assert.*;
/**
@@ -114,7 +115,7 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
} catch (Exception ex) {
final Throwable inner = ExceptionUtils.getRootCause(ex);
assertTrue(inner instanceof RuntimeException);
- assertEquals("Error while processing results from channel - check client and server logs for more information", inner.getMessage());
+ assertThat(inner.getMessage(), startsWith("Encountered unregistered class ID:"));
}
cluster.close();
@@ -385,10 +386,8 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
client.submit("'" + fatty + "'").all().get();
fail("Should throw an exception.");
} catch (Exception re) {
- // can't seem to catch the server side exception - as the channel is basically closed on this error
- // can only detect a closed channel and react to that. in some ways this is a good general piece of
- // code to have in place, but kinda stinky when you want something specific about why all went bad
- assertTrue(re.getCause().getMessage().equals("Error while processing results from channel - check client and server logs for more information"));
+ Throwable root = ExceptionUtils.getRootCause(re);
+ assertTrue(root.getMessage().equals("Max frame length of 1 has been exceeded."));
} finally {
cluster.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
index 9a7607c..9397397 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
@@ -396,10 +396,8 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
resultSet.all().get();
fail("Should throw an exception.");
} catch (Exception re) {
- // can't seem to catch the server side exception - as the channel is basically closed on this error
- // can only detect a closed channel and react to that. in some ways this is a good general piece of
- // code to have in place, but kinda stinky when you want something specific about why all went bad
- assertTrue(re.getCause().getMessage().equals("Error while processing results from channel - check client and server logs for more information"));
+ Throwable root = ExceptionUtils.getRootCause(re);
+ assertEquals("Connection reset by peer", root.getMessage());
} finally {
cluster.close();
}