You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/06/26 00:24:49 UTC

incubator-tinkerpop git commit: Use read complete future instead of polling in ResultSet.all() TINKERPOP3-734

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/resultset-refactor [created] 724f619aa


Use read complete future instead of polling in ResultSet.all() TINKERPOP3-734

This change had the additional side-effect of bubbling up some more detailed exception messages to the client.  Improvements to javadoc.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/724f619a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/724f619a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/724f619a

Branch: refs/heads/resultset-refactor
Commit: 724f619aa74892a29c875e28f8b3936ec54619d9
Parents: d3a4b56
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Thu Jun 25 18:21:59 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Thu Jun 25 18:21:59 2015 -0400

----------------------------------------------------------------------
 .../tinkerpop/gremlin/driver/Connection.java    |  3 ++-
 .../tinkerpop/gremlin/driver/Handler.java       |  5 ++++
 .../tinkerpop/gremlin/driver/ResultQueue.java   |  6 +++++
 .../tinkerpop/gremlin/driver/ResultSet.java     | 26 +++++++++++++-------
 .../driver/handler/WebSocketClientHandler.java  |  1 +
 .../server/GremlinDriverIntegrateTest.java      |  9 +++----
 .../server/GremlinServerIntegrateTest.java      |  6 ++---
 7 files changed, 37 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 364dade..4bd2237 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -175,13 +175,14 @@ final class Connection {
                             if (isClosed() && pending.isEmpty())
                                 shutdown(closeFuture.get());
                         }, cluster.executor());
+
                         final ResultQueue handler = new ResultQueue(resultLinkedBlockingQueue, readCompleted);
                         pending.put(requestMessage.getRequestId(), handler);
                         final ResultSet resultSet = new ResultSet(handler, cluster.executor(), channel,
                                 () -> {
                                     pending.remove(requestMessage.getRequestId());
                                     return null;
-                                });
+                                }, readCompleted);
                         future.complete(resultSet);
                     }
                 });

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
index 1ce7f8f..fe27528 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
@@ -87,6 +87,11 @@ final class Handler {
             // messages queue will not clear.  wonder if there is some way to cope with that.  of course, if
             // there are that many failures someone would take notice and hopefully stop the client.
             logger.error("Could not process the response - correct the problem and restart the driver.", cause);
+
+            // the channel is getting closed because of something pretty bad so release all the completeable
+            // futures out there
+            pending.entrySet().stream().forEach(kv -> kv.getValue().markError(cause));
+
             ctx.close();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
index 5f1022f..6f33343 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.driver;
 
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -81,6 +82,11 @@ final class ResultQueue {
         return result;
     }
 
+    public void drainTo(final Collection<Result> collection) {
+        if (error.get() != null) throw new RuntimeException(error.get());
+        resultLinkedBlockingQueue.drainTo(collection);
+    }
+
     public Status getStatus() {
         return status;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
index 9f987bc..74ce9b2 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
@@ -37,6 +37,11 @@ import java.util.stream.StreamSupport;
  * therefore may not be available immediately.  As such, {@code ResultSet} provides access to a a number
  * of functions that help to work with the asynchronous nature of the data streaming back.  Data from results
  * is stored in an {@link Result} which can be used to retrieve the item once it is on the client side.
+ * <p/>
+ * Note that a {@code ResultSet} is a forward-only stream only so depending on how the methods are called and
+ * interacted with, it is possible to return partial bits of total response (e.g. calling {@link #one()} followed
+ * by {@link #all()} will make it so that the {@link List} of results returned from {@link #all()} have one
+ * {@link Result} missing from the total set as it was already retrieved by {@link #one}.
  *
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
@@ -46,19 +51,23 @@ public final class ResultSet implements Iterable<Result> {
     private final Channel channel;
     private final Supplier<Void> onChannelError;
 
+    private final CompletableFuture<Void> readCompleted;
+
     public ResultSet(final ResultQueue resultQueue, final ExecutorService executor,
-                     final Channel channel, final Supplier<Void> onChannelError) {
+                     final Channel channel, final Supplier<Void> onChannelError,
+                     final CompletableFuture<Void> readCompleted) {
         this.executor = executor;
         this.resultQueue = resultQueue;
         this.channel = channel;
         this.onChannelError = onChannelError;
+        this.readCompleted = readCompleted;
     }
 
     /**
      * Determines if all items have been returned to the client.
      */
     public boolean allItemsAvailable() {
-        return resultQueue.getStatus() == ResultQueue.Status.COMPLETE;
+        return readCompleted.isDone();
     }
 
     /**
@@ -110,16 +119,15 @@ public final class ResultSet implements Iterable<Result> {
     }
 
     /**
-     * Wait for all items to be available on the client exhausting the stream.
+     * The returned {@link CompletableFuture} completes when all reads are complete for this request and the
+     * entire result has been accounted for on the client. While this method is named "all" it really refers to
+     * retrieving all remaining items in the set.  For large result sets it is preferred to use
+     * {@link Iterator} or {@link Stream} options, as the results will be held in memory at once.
      */
     public CompletableFuture<List<Result>> all() {
-        return CompletableFuture.supplyAsync(() -> {
+        return readCompleted.thenApplyAsync(it -> {
             final List<Result> list = new ArrayList<>();
-            while (!isExhausted()) {
-                final Result result = resultQueue.poll();
-                if (result != null)
-                    list.add(result);
-            }
+            resultQueue.drainTo(list);
             return list;
         }, executor);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 81f1e41..922775e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -98,5 +98,6 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
         logger.warn("Exception caught during WebSocket processing - closing connection", cause);
         if (!handshakeFuture.isDone()) handshakeFuture.setFailure(cause);
         ctx.close();
+        ctx.fireExceptionCaught(cause);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index e8b8f1e..f86cf18 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -53,6 +53,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.core.StringStartsWith.startsWith;
 import static org.junit.Assert.*;
 
 /**
@@ -114,7 +115,7 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         } catch (Exception ex) {
             final Throwable inner = ExceptionUtils.getRootCause(ex);
             assertTrue(inner instanceof RuntimeException);
-            assertEquals("Error while processing results from channel - check client and server logs for more information", inner.getMessage());
+            assertThat(inner.getMessage(), startsWith("Encountered unregistered class ID:"));
         }
 
         cluster.close();
@@ -385,10 +386,8 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
             client.submit("'" + fatty + "'").all().get();
             fail("Should throw an exception.");
         } catch (Exception re) {
-            // can't seem to catch the server side exception - as the channel is basically closed on this error
-            // can only detect a closed channel and react to that.  in some ways this is a good general piece of
-            // code to have in place, but kinda stinky when you want something specific about why all went bad
-            assertTrue(re.getCause().getMessage().equals("Error while processing results from channel - check client and server logs for more information"));
+            Throwable root = ExceptionUtils.getRootCause(re);
+            assertTrue(root.getMessage().equals("Max frame length of 1 has been exceeded."));
         } finally {
             cluster.close();
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
index 9a7607c..9397397 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
@@ -396,10 +396,8 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
             resultSet.all().get();
             fail("Should throw an exception.");
         } catch (Exception re) {
-            // can't seem to catch the server side exception - as the channel is basically closed on this error
-            // can only detect a closed channel and react to that.  in some ways this is a good general piece of
-            // code to have in place, but kinda stinky when you want something specific about why all went bad
-            assertTrue(re.getCause().getMessage().equals("Error while processing results from channel - check client and server logs for more information"));
+            Throwable root = ExceptionUtils.getRootCause(re);
+            assertEquals("Connection reset by peer", root.getMessage());
         } finally {
             cluster.close();
         }