You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/06/26 19:59:28 UTC

[6/6] incubator-tinkerpop git commit: Refactored ResultSet pretty heavily.

Refactored ResultSet pretty heavily.

Added a some() method to get some specified number of Result objects back and refactored the implementation of one().


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/382ccbfe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/382ccbfe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/382ccbfe

Branch: refs/heads/resultset-refactor
Commit: 382ccbfecbcbb4a6eff3121cd48b26f1fa9ea52d
Parents: ac53d5a
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri Jun 26 13:57:58 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri Jun 26 13:57:58 2015 -0400

----------------------------------------------------------------------
 .../tinkerpop/gremlin/driver/ResultQueue.java   |  91 ++++++++++------
 .../tinkerpop/gremlin/driver/ResultSet.java     |  43 ++------
 .../server/GremlinDriverIntegrateTest.java      | 105 ++++++++++++++++++-
 3 files changed, 168 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/382ccbfe/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
index 6f33343..cc65e4d 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
@@ -19,34 +19,41 @@
 package org.apache.tinkerpop.gremlin.driver;
 
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.javatuples.Pair;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A queue of incoming {@link ResponseMessage} objects.  The queue is updated by the
- * {@link Handler.GremlinResponseHandler} until a response terminator is identified.  At that point the fetch
- * status is changed to {@link Status#COMPLETE} and all results have made it client side.
+ * {@link Handler.GremlinResponseHandler} until a response terminator is identified.
  *
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
 final class ResultQueue {
-    public enum Status {
-        FETCHING,
-        COMPLETE
-    }
 
     private final LinkedBlockingQueue<Result> resultLinkedBlockingQueue;
 
-    private volatile Status status = Status.FETCHING;
-
     private final AtomicReference<Throwable> error = new AtomicReference<>();
 
     private final CompletableFuture<Void> readComplete;
 
+    private final Queue<Pair<CompletableFuture<List<Result>>,Integer>> waiting = new ConcurrentLinkedQueue<>();
+
+    /**
+     * Tracks the state of the "waiting" queue and whether or not results have been drained through it on
+     * read complete.  If they are then no additional "waiting" is required.
+     */
+    private final AtomicBoolean flushed = new AtomicBoolean(false);
+
     public ResultQueue(final LinkedBlockingQueue<Result> resultLinkedBlockingQueue, final CompletableFuture<Void> readComplete) {
         this.resultLinkedBlockingQueue = resultLinkedBlockingQueue;
         this.readComplete = readComplete;
@@ -54,6 +61,38 @@ final class ResultQueue {
 
     public void add(final Result result) {
         this.resultLinkedBlockingQueue.offer(result);
+
+        final Pair<CompletableFuture<List<Result>>, Integer> nextWaiting = waiting.peek();
+        if (nextWaiting != null && (resultLinkedBlockingQueue.size() > nextWaiting.getValue1() || readComplete.isDone())) {
+            final List<Result> results = new ArrayList<>(nextWaiting.getValue1());
+            resultLinkedBlockingQueue.drainTo(results, nextWaiting.getValue1());
+            nextWaiting.getValue0().complete(results);
+            waiting.remove(nextWaiting);
+        }
+    }
+
+    public CompletableFuture<List<Result>> await(final int items) {
+        final CompletableFuture<List<Result>> result = new CompletableFuture<>();
+        if (size() > items || readComplete.isDone()) {
+            // items are present so just drain to requested size if possible then complete it
+            final List<Result> results = new ArrayList<>(items);
+            resultLinkedBlockingQueue.drainTo(results, items);
+            result.complete(results);
+        } else {
+            // not enough items in the result queue so save this for callback later when the results actually arrive.
+            // only necessary to "wait" if we're not in the act of flushing already, in which case, no more waiting
+            // for additional results should be allowed.
+            if (flushed.get()) {
+                // just drain since we've flushed already
+                final List<Result> results = new ArrayList<>(items);
+                resultLinkedBlockingQueue.drainTo(results, items);
+                result.complete(results);
+            } else {
+                waiting.add(Pair.with(result, items));
+            }
+        }
+
+        return result;
     }
 
     public int size() {
@@ -66,38 +105,30 @@ final class ResultQueue {
         return this.size() == 0;
     }
 
-    public Result poll() {
-        Result result = null;
-        do {
-            if (error.get() != null) throw new RuntimeException(error.get());
-            try {
-                result = resultLinkedBlockingQueue.poll(10, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException ie) {
-                error.set(new RuntimeException(ie));
-            }
-        } while (null == result && status == Status.FETCHING);
-
-        if (error.get() != null) throw new RuntimeException(error.get());
-
-        return result;
-    }
-
     public void drainTo(final Collection<Result> collection) {
         if (error.get() != null) throw new RuntimeException(error.get());
         resultLinkedBlockingQueue.drainTo(collection);
     }
 
-    public Status getStatus() {
-        return status;
-    }
-
     void markComplete() {
-        this.status = Status.COMPLETE;
         this.readComplete.complete(null);
+        this.flushWaiting();
     }
 
     void markError(final Throwable throwable) {
         error.set(throwable);
         this.readComplete.complete(null);
+        this.flushWaiting();
+    }
+
+    private void flushWaiting() {
+        while (waiting.peek() != null) {
+            final Pair<CompletableFuture<List<Result>>, Integer> nextWaiting = waiting.poll();
+            final List<Result> results = new ArrayList<>(nextWaiting.getValue1());
+            resultLinkedBlockingQueue.drainTo(results, nextWaiting.getValue1());
+            nextWaiting.getValue0().complete(results);
+        }
+
+        flushed.set(true);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/382ccbfe/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
index 74ce9b2..66a9ff0 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
@@ -81,41 +81,23 @@ public final class ResultSet implements Iterable<Result> {
      * Determines if there are any remaining items being streamed to the client.
      */
     public boolean isExhausted() {
-        if (!resultQueue.isEmpty())
-            return false;
-
-        internalAwaitItems(1);
-
-        assert !resultQueue.isEmpty() || allItemsAvailable();
-        return resultQueue.isEmpty();
+        return !(!allItemsAvailable() || !resultQueue.isEmpty());
     }
 
     /**
      * Get the next {@link Result} from the stream, blocking until one is available.
      */
     public Result one() {
-        Result result = resultQueue.poll();
-        if (result != null)
-            return result;
-
-        internalAwaitItems(1);
-
-        result = resultQueue.poll();
-        if (result != null)
-            return result;
-        else
-            return null;
+        return some(1).join().get(0);
     }
 
     /**
-     * Wait for some number of items to be available on the client. The future will contain the number of items
-     * available which may or may not be the number the caller was waiting for.
+     * The returned {@link CompletableFuture} completes when the number of items specified are available.  The
+     * number returned will be equal to or less than that number.  They will only be less if the stream is
+     * completed and there are less than that number specified available.
      */
-    public CompletableFuture<Integer> awaitItems(final int items) {
-        if (allItemsAvailable())
-            CompletableFuture.completedFuture(getAvailableItemCount());
-
-        return CompletableFuture.supplyAsync(() -> internalAwaitItems(items), executor);
+    public CompletableFuture<List<Result>> some(final int items) {
+        return resultQueue.await(items);
     }
 
     /**
@@ -159,15 +141,4 @@ public final class ResultSet implements Iterable<Result> {
             }
         };
     }
-
-    private int internalAwaitItems(final int items) {
-        while (!allItemsAvailable() && getAvailableItemCount() < items) {
-            if (!channel.isOpen()) {
-                onChannelError.get();
-                throw new RuntimeException("Error while processing results from channel - check client and server logs for more information");
-            }
-        }
-
-        return getAvailableItemCount();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/382ccbfe/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 1417ce9..1b5f6c5 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -41,6 +41,7 @@ import org.junit.rules.TestName;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -53,6 +54,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.core.StringStartsWith.startsWith;
 import static org.junit.Assert.*;
 
@@ -170,6 +172,96 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
         final AtomicInteger counter = new AtomicInteger(0);
         results.stream().map(i -> i.get(Integer.class) * 2).forEach(i -> assertEquals(counter.incrementAndGet() * 2, Integer.parseInt(i.toString())));
+        assertEquals(9, counter.get());
+        assertThat(results.allItemsAvailable(), is(true));
+        assertThat(results.isExhausted(), is(true));
+
+        // cant stream it again
+        assertThat(results.stream().iterator().hasNext(), is(false));
+
+        cluster.close();
+    }
+
+    @Test
+    public void shouldIterate() throws Exception {
+        final Cluster cluster = Cluster.open();
+        final Client client = cluster.connect();
+
+        final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
+        final Iterator<Result> itty = results.iterator();
+        final AtomicInteger counter = new AtomicInteger(0);
+        while (itty.hasNext()) {
+            counter.incrementAndGet();
+            assertEquals(counter.get(), itty.next().getInt());
+        }
+
+        assertEquals(9, counter.get());
+        assertThat(results.allItemsAvailable(), is(true));
+        assertThat(results.isExhausted(), is(true));
+
+        // can't stream it again
+        assertThat(results.iterator().hasNext(), is(false));
+
+        cluster.close();
+    }
+
+    @Test
+    public void shouldGetSomeThenSomeMore() throws Exception {
+        final Cluster cluster = Cluster.open();
+        final Client client = cluster.connect();
+
+        final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
+        final CompletableFuture<List<Result>> batch1 = results.some(5);
+        final CompletableFuture<List<Result>> batch2 = results.some(5);
+        final CompletableFuture<List<Result>> batchNothingLeft = results.some(5);
+
+        assertEquals(5, batch1.get().size());
+        assertEquals(1, batch1.get().get(0).getInt());
+        assertEquals(2, batch1.get().get(1).getInt());
+        assertEquals(3, batch1.get().get(2).getInt());
+        assertEquals(4, batch1.get().get(3).getInt());
+        assertEquals(5, batch1.get().get(4).getInt());
+        assertThat(results.isExhausted(), is(false));
+
+        assertEquals(4, batch2.get().size());
+        assertEquals(6, batch2.get().get(0).getInt());
+        assertEquals(7, batch2.get().get(1).getInt());
+        assertEquals(8, batch2.get().get(2).getInt());
+        assertEquals(9, batch2.get().get(3).getInt());
+        assertThat(results.isExhausted(), is(true));
+
+        assertEquals(0, batchNothingLeft.get().size());
+
+        cluster.close();
+    }
+
+    @Test
+    public void shouldGetOneThenSomeThenSomeMore() throws Exception {
+        final Cluster cluster = Cluster.open();
+        final Client client = cluster.connect();
+
+        final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
+        final Result one = results.one();
+        final CompletableFuture<List<Result>> batch1 = results.some(4);
+        final CompletableFuture<List<Result>> batch2 = results.some(5);
+        final CompletableFuture<List<Result>> batchNothingLeft = results.some(5);
+
+        assertEquals(1, one.getInt());
+
+        assertEquals(4, batch1.get().size());
+        assertEquals(2, batch1.get().get(0).getInt());
+        assertEquals(3, batch1.get().get(1).getInt());
+        assertEquals(4, batch1.get().get(2).getInt());
+        assertEquals(5, batch1.get().get(3).getInt());
+
+        assertEquals(4, batch2.get().size());
+        assertEquals(6, batch2.get().get(0).getInt());
+        assertEquals(7, batch2.get().get(1).getInt());
+        assertEquals(8, batch2.get().get(2).getInt());
+        assertEquals(9, batch2.get().get(3).getInt());
+        assertThat(results.isExhausted(), is(true));
+
+        assertEquals(0, batchNothingLeft.get().size());
 
         cluster.close();
     }
@@ -177,10 +269,13 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
     /**
      * This test arose from this issue: https://github.org/apache/tinkerpop/tinkerpop3/issues/515
      * <p/>
-     * ResultSet.all returns a CompleteableFuture that blocks on the worker pool until isExausted returns false.
-     * isExausted in turn needs a thread on the worker pool to even return. So its totally possible to consume all
+     * ResultSet.all returns a CompletableFuture that blocks on the worker pool until isExhausted returns false.
+     * isExhausted in turn needs a thread on the worker pool to even return. So its totally possible to consume all
      * threads on the worker pool waiting for .all to finish such that you can't even get one to wait for
-     * isExausted to run.
+     * isExhausted to run.
+     * <p/>
+     * Note that all() doesn't work as described above anymore.  It waits for callback on readComplete rather
+     * than blocking on isExhausted.
      */
     @Test
     public void shouldAvoidDeadlockOnCallToResultSetDotAll() throws Exception {
@@ -377,8 +472,8 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Client client = cluster.connect(name.getMethodName());
 
         final ResultSet results1 = client.submit("x = [1,2,3,4,5,6,7,8,9]");
-        final AtomicInteger counter = new AtomicInteger(0);
-        results1.stream().map(i -> i.get(Integer.class) * 2).forEach(i -> assertEquals(counter.incrementAndGet() * 2, Integer.parseInt(i.toString())));
+        assertEquals(9, results1.all().get().size());
+        assertThat(results1.isExhausted(), is(true));
 
         final ResultSet results2 = client.submit("x[0]+1");
         assertEquals(2, results2.all().get().get(0).getInt());