You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/06/26 19:59:28 UTC
[6/6] incubator-tinkerpop git commit: Refactored ResultSet pretty
heavily.
Refactored ResultSet pretty heavily.
Added a some() method to get some specified number of Result objects back and refactored the implementation of one().
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/382ccbfe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/382ccbfe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/382ccbfe
Branch: refs/heads/resultset-refactor
Commit: 382ccbfecbcbb4a6eff3121cd48b26f1fa9ea52d
Parents: ac53d5a
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri Jun 26 13:57:58 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri Jun 26 13:57:58 2015 -0400
----------------------------------------------------------------------
.../tinkerpop/gremlin/driver/ResultQueue.java | 91 ++++++++++------
.../tinkerpop/gremlin/driver/ResultSet.java | 43 ++------
.../server/GremlinDriverIntegrateTest.java | 105 ++++++++++++++++++-
3 files changed, 168 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/382ccbfe/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
index 6f33343..cc65e4d 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
@@ -19,34 +19,41 @@
package org.apache.tinkerpop.gremlin.driver;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.javatuples.Pair;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* A queue of incoming {@link ResponseMessage} objects. The queue is updated by the
- * {@link Handler.GremlinResponseHandler} until a response terminator is identified. At that point the fetch
- * status is changed to {@link Status#COMPLETE} and all results have made it client side.
+ * {@link Handler.GremlinResponseHandler} until a response terminator is identified.
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
final class ResultQueue {
- public enum Status {
- FETCHING,
- COMPLETE
- }
private final LinkedBlockingQueue<Result> resultLinkedBlockingQueue;
- private volatile Status status = Status.FETCHING;
-
private final AtomicReference<Throwable> error = new AtomicReference<>();
private final CompletableFuture<Void> readComplete;
+ private final Queue<Pair<CompletableFuture<List<Result>>,Integer>> waiting = new ConcurrentLinkedQueue<>();
+
+ /**
+ * Tracks the state of the "waiting" queue and whether or not results have been drained through it on
+ * read complete. If they are then no additional "waiting" is required.
+ */
+ private final AtomicBoolean flushed = new AtomicBoolean(false);
+
public ResultQueue(final LinkedBlockingQueue<Result> resultLinkedBlockingQueue, final CompletableFuture<Void> readComplete) {
this.resultLinkedBlockingQueue = resultLinkedBlockingQueue;
this.readComplete = readComplete;
@@ -54,6 +61,38 @@ final class ResultQueue {
public void add(final Result result) {
this.resultLinkedBlockingQueue.offer(result);
+
+ final Pair<CompletableFuture<List<Result>>, Integer> nextWaiting = waiting.peek();
+ if (nextWaiting != null && (resultLinkedBlockingQueue.size() > nextWaiting.getValue1() || readComplete.isDone())) {
+ final List<Result> results = new ArrayList<>(nextWaiting.getValue1());
+ resultLinkedBlockingQueue.drainTo(results, nextWaiting.getValue1());
+ nextWaiting.getValue0().complete(results);
+ waiting.remove(nextWaiting);
+ }
+ }
+
+ public CompletableFuture<List<Result>> await(final int items) {
+ final CompletableFuture<List<Result>> result = new CompletableFuture<>();
+ if (size() > items || readComplete.isDone()) {
+ // items are present so just drain to requested size if possible then complete it
+ final List<Result> results = new ArrayList<>(items);
+ resultLinkedBlockingQueue.drainTo(results, items);
+ result.complete(results);
+ } else {
+ // not enough items in the result queue so save this for callback later when the results actually arrive.
+ // only necessary to "wait" if we're not in the act of flushing already, in which case, no more waiting
+ // for additional results should be allowed.
+ if (flushed.get()) {
+ // just drain since we've flushed already
+ final List<Result> results = new ArrayList<>(items);
+ resultLinkedBlockingQueue.drainTo(results, items);
+ result.complete(results);
+ } else {
+ waiting.add(Pair.with(result, items));
+ }
+ }
+
+ return result;
}
public int size() {
@@ -66,38 +105,30 @@ final class ResultQueue {
return this.size() == 0;
}
- public Result poll() {
- Result result = null;
- do {
- if (error.get() != null) throw new RuntimeException(error.get());
- try {
- result = resultLinkedBlockingQueue.poll(10, TimeUnit.MILLISECONDS);
- } catch (InterruptedException ie) {
- error.set(new RuntimeException(ie));
- }
- } while (null == result && status == Status.FETCHING);
-
- if (error.get() != null) throw new RuntimeException(error.get());
-
- return result;
- }
-
public void drainTo(final Collection<Result> collection) {
if (error.get() != null) throw new RuntimeException(error.get());
resultLinkedBlockingQueue.drainTo(collection);
}
- public Status getStatus() {
- return status;
- }
-
void markComplete() {
- this.status = Status.COMPLETE;
this.readComplete.complete(null);
+ this.flushWaiting();
}
void markError(final Throwable throwable) {
error.set(throwable);
this.readComplete.complete(null);
+ this.flushWaiting();
+ }
+
+ private void flushWaiting() {
+ while (waiting.peek() != null) {
+ final Pair<CompletableFuture<List<Result>>, Integer> nextWaiting = waiting.poll();
+ final List<Result> results = new ArrayList<>(nextWaiting.getValue1());
+ resultLinkedBlockingQueue.drainTo(results, nextWaiting.getValue1());
+ nextWaiting.getValue0().complete(results);
+ }
+
+ flushed.set(true);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/382ccbfe/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
index 74ce9b2..66a9ff0 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
@@ -81,41 +81,23 @@ public final class ResultSet implements Iterable<Result> {
* Determines if there are any remaining items being streamed to the client.
*/
public boolean isExhausted() {
- if (!resultQueue.isEmpty())
- return false;
-
- internalAwaitItems(1);
-
- assert !resultQueue.isEmpty() || allItemsAvailable();
- return resultQueue.isEmpty();
+ return !(!allItemsAvailable() || !resultQueue.isEmpty());
}
/**
* Get the next {@link Result} from the stream, blocking until one is available.
*/
public Result one() {
- Result result = resultQueue.poll();
- if (result != null)
- return result;
-
- internalAwaitItems(1);
-
- result = resultQueue.poll();
- if (result != null)
- return result;
- else
- return null;
+ return some(1).join().get(0);
}
/**
- * Wait for some number of items to be available on the client. The future will contain the number of items
- * available which may or may not be the number the caller was waiting for.
+ * The returned {@link CompletableFuture} completes when the number of items specified are available. The
+ * number returned will be equal to or less than that number. They will only be less if the stream is
+ * completed and there are less than that number specified available.
*/
- public CompletableFuture<Integer> awaitItems(final int items) {
- if (allItemsAvailable())
- CompletableFuture.completedFuture(getAvailableItemCount());
-
- return CompletableFuture.supplyAsync(() -> internalAwaitItems(items), executor);
+ public CompletableFuture<List<Result>> some(final int items) {
+ return resultQueue.await(items);
}
/**
@@ -159,15 +141,4 @@ public final class ResultSet implements Iterable<Result> {
}
};
}
-
- private int internalAwaitItems(final int items) {
- while (!allItemsAvailable() && getAvailableItemCount() < items) {
- if (!channel.isOpen()) {
- onChannelError.get();
- throw new RuntimeException("Error while processing results from channel - check client and server logs for more information");
- }
- }
-
- return getAvailableItemCount();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/382ccbfe/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 1417ce9..1b5f6c5 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -41,6 +41,7 @@ import org.junit.rules.TestName;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -53,6 +54,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.core.StringStartsWith.startsWith;
import static org.junit.Assert.*;
@@ -170,6 +172,96 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
final AtomicInteger counter = new AtomicInteger(0);
results.stream().map(i -> i.get(Integer.class) * 2).forEach(i -> assertEquals(counter.incrementAndGet() * 2, Integer.parseInt(i.toString())));
+ assertEquals(9, counter.get());
+ assertThat(results.allItemsAvailable(), is(true));
+ assertThat(results.isExhausted(), is(true));
+
+ // cant stream it again
+ assertThat(results.stream().iterator().hasNext(), is(false));
+
+ cluster.close();
+ }
+
+ @Test
+ public void shouldIterate() throws Exception {
+ final Cluster cluster = Cluster.open();
+ final Client client = cluster.connect();
+
+ final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
+ final Iterator<Result> itty = results.iterator();
+ final AtomicInteger counter = new AtomicInteger(0);
+ while (itty.hasNext()) {
+ counter.incrementAndGet();
+ assertEquals(counter.get(), itty.next().getInt());
+ }
+
+ assertEquals(9, counter.get());
+ assertThat(results.allItemsAvailable(), is(true));
+ assertThat(results.isExhausted(), is(true));
+
+ // can't stream it again
+ assertThat(results.iterator().hasNext(), is(false));
+
+ cluster.close();
+ }
+
+ @Test
+ public void shouldGetSomeThenSomeMore() throws Exception {
+ final Cluster cluster = Cluster.open();
+ final Client client = cluster.connect();
+
+ final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
+ final CompletableFuture<List<Result>> batch1 = results.some(5);
+ final CompletableFuture<List<Result>> batch2 = results.some(5);
+ final CompletableFuture<List<Result>> batchNothingLeft = results.some(5);
+
+ assertEquals(5, batch1.get().size());
+ assertEquals(1, batch1.get().get(0).getInt());
+ assertEquals(2, batch1.get().get(1).getInt());
+ assertEquals(3, batch1.get().get(2).getInt());
+ assertEquals(4, batch1.get().get(3).getInt());
+ assertEquals(5, batch1.get().get(4).getInt());
+ assertThat(results.isExhausted(), is(false));
+
+ assertEquals(4, batch2.get().size());
+ assertEquals(6, batch2.get().get(0).getInt());
+ assertEquals(7, batch2.get().get(1).getInt());
+ assertEquals(8, batch2.get().get(2).getInt());
+ assertEquals(9, batch2.get().get(3).getInt());
+ assertThat(results.isExhausted(), is(true));
+
+ assertEquals(0, batchNothingLeft.get().size());
+
+ cluster.close();
+ }
+
+ @Test
+ public void shouldGetOneThenSomeThenSomeMore() throws Exception {
+ final Cluster cluster = Cluster.open();
+ final Client client = cluster.connect();
+
+ final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
+ final Result one = results.one();
+ final CompletableFuture<List<Result>> batch1 = results.some(4);
+ final CompletableFuture<List<Result>> batch2 = results.some(5);
+ final CompletableFuture<List<Result>> batchNothingLeft = results.some(5);
+
+ assertEquals(1, one.getInt());
+
+ assertEquals(4, batch1.get().size());
+ assertEquals(2, batch1.get().get(0).getInt());
+ assertEquals(3, batch1.get().get(1).getInt());
+ assertEquals(4, batch1.get().get(2).getInt());
+ assertEquals(5, batch1.get().get(3).getInt());
+
+ assertEquals(4, batch2.get().size());
+ assertEquals(6, batch2.get().get(0).getInt());
+ assertEquals(7, batch2.get().get(1).getInt());
+ assertEquals(8, batch2.get().get(2).getInt());
+ assertEquals(9, batch2.get().get(3).getInt());
+ assertThat(results.isExhausted(), is(true));
+
+ assertEquals(0, batchNothingLeft.get().size());
cluster.close();
}
@@ -177,10 +269,13 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
/**
* This test arose from this issue: https://github.org/apache/tinkerpop/tinkerpop3/issues/515
* <p/>
- * ResultSet.all returns a CompleteableFuture that blocks on the worker pool until isExausted returns false.
- * isExausted in turn needs a thread on the worker pool to even return. So its totally possible to consume all
+ * ResultSet.all returns a CompletableFuture that blocks on the worker pool until isExhausted returns false.
+ * isExhausted in turn needs a thread on the worker pool to even return. So its totally possible to consume all
* threads on the worker pool waiting for .all to finish such that you can't even get one to wait for
- * isExausted to run.
+ * isExhausted to run.
+ * <p/>
+ * Note that all() doesn't work as described above anymore. It waits for callback on readComplete rather
+ * than blocking on isExhausted.
*/
@Test
public void shouldAvoidDeadlockOnCallToResultSetDotAll() throws Exception {
@@ -377,8 +472,8 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
final Client client = cluster.connect(name.getMethodName());
final ResultSet results1 = client.submit("x = [1,2,3,4,5,6,7,8,9]");
- final AtomicInteger counter = new AtomicInteger(0);
- results1.stream().map(i -> i.get(Integer.class) * 2).forEach(i -> assertEquals(counter.incrementAndGet() * 2, Integer.parseInt(i.toString())));
+ assertEquals(9, results1.all().get().size());
+ assertThat(results1.isExhausted(), is(true));
final ResultSet results2 = client.submit("x[0]+1");
assertEquals(2, results2.all().get().get(0).getInt());