You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2016/05/27 14:27:10 UTC

incubator-tinkerpop git commit: Fixed for Result.one() which could have blocked indefinitely.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1196 [created] 3f8577596


Fixed for Result.one() which could have blocked indefinitely.

Result.one() sometimes would hang under rare but possible conditions. This change should prevent that from happening now as it removes the chance of await() futures from being created while a flush of existing waiting futures is occurring. When that happened it seemed to create the possibility where an await() future could get created but never been completed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/3f857759
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/3f857759
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/3f857759

Branch: refs/heads/TINKERPOP-1196
Commit: 3f8577596dee49493a75b039bf16e2e3f8e92793
Parents: 63e849c
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri May 27 10:23:54 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri May 27 10:23:54 2016 -0400

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  1 +
 .../tinkerpop/gremlin/driver/ResultQueue.java   | 86 +++++++++-----------
 .../tinkerpop/gremlin/driver/ResultSet.java     |  6 +-
 3 files changed, 39 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3f857759/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 3d1f51f..cb183a6 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -28,6 +28,7 @@ TinkerPop 3.1.3 (NOT OFFICIALLY RELEASED YET)
 
 * Named the thread pool used by Gremlin Server sessions: "gremlin-server-session-$n".
 * Fixed a bug in `BulkSet.equals()` which made itself apparent when using `store()` and `aggregate()` with labeled `cap()`.
+* Fixed a bug where `Result.one()` could potentially block indefinitely under certain circumstances.
 * Ensured that all asserts of vertex and edge counts were being applied properly in the test suite.
 * Fixed bug in `gremlin-driver` where certain channel-level errors would not allow the driver to reconnect.
 * `SubgraphStep` now consults the parent graph features to determine cardinality of a property.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3f857759/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
index da6a008..929cc09 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.driver;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
 import org.javatuples.Pair;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -29,7 +28,6 @@ import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -38,6 +36,7 @@ import java.util.concurrent.atomic.AtomicReference;
  *
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
+@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
 final class ResultQueue {
 
     private final LinkedBlockingQueue<Result> resultLinkedBlockingQueue;
@@ -48,12 +47,6 @@ final class ResultQueue {
 
     private final Queue<Pair<CompletableFuture<List<Result>>,Integer>> waiting = new ConcurrentLinkedQueue<>();
 
-    /**
-     * Tracks the state of the "waiting" queue and whether or not results have been drained through it on
-     * read complete.  If they are then no additional "waiting" is required.
-     */
-    private final AtomicBoolean flushed = new AtomicBoolean(false);
-
     public ResultQueue(final LinkedBlockingQueue<Result> resultLinkedBlockingQueue, final CompletableFuture<Void> readComplete) {
         this.resultLinkedBlockingQueue = resultLinkedBlockingQueue;
         this.readComplete = readComplete;
@@ -61,30 +54,14 @@ final class ResultQueue {
 
     public void add(final Result result) {
         this.resultLinkedBlockingQueue.offer(result);
-
-        final Pair<CompletableFuture<List<Result>>, Integer> nextWaiting = waiting.peek();
-        if (nextWaiting != null && (resultLinkedBlockingQueue.size() >= nextWaiting.getValue1() || readComplete.isDone())) {
-            internalDrain(nextWaiting.getValue1(), nextWaiting.getValue0(), resultLinkedBlockingQueue);
-            waiting.remove(nextWaiting);
-        }
+        tryDrainNextWaiting(false);
     }
 
     public CompletableFuture<List<Result>> await(final int items) {
         final CompletableFuture<List<Result>> result = new CompletableFuture<>();
-        if (size() >= items || readComplete.isDone()) {
-            // items are present so just drain to requested size if possible then complete it
-            internalDrain(items, result, resultLinkedBlockingQueue);
-        } else {
-            // not enough items in the result queue so save this for callback later when the results actually arrive.
-            // only necessary to "wait" if we're not in the act of flushing already, in which case, no more waiting
-            // for additional results should be allowed.
-            if (flushed.get()) {
-                // just drain since we've flushed already
-                internalDrain(items, result, resultLinkedBlockingQueue);
-            } else {
-                waiting.add(Pair.with(result, items));
-            }
-        }
+        waiting.add(Pair.with(result, items));
+
+        tryDrainNextWaiting(false);
 
         return result;
     }
@@ -99,42 +76,53 @@ final class ResultQueue {
         return this.size() == 0;
     }
 
-    public void drainTo(final Collection<Result> collection) {
+    void drainTo(final Collection<Result> collection) {
         if (error.get() != null) throw new RuntimeException(error.get());
         resultLinkedBlockingQueue.drainTo(collection);
     }
 
     void markComplete() {
         this.readComplete.complete(null);
-        this.flushWaiting();
+        this.drainAllWaiting();
     }
 
     void markError(final Throwable throwable) {
         error.set(throwable);
         this.readComplete.completeExceptionally(throwable);
-        this.flushWaiting();
+        this.drainAllWaiting();
     }
 
-    private void flushWaiting() {
-        while (waiting.peek() != null) {
-            final Pair<CompletableFuture<List<Result>>, Integer> nextWaiting = waiting.poll();
-            internalDrain(nextWaiting.getValue1(), nextWaiting.getValue0(), resultLinkedBlockingQueue);
-        }
+    /**
+     * Completes the next waiting future if there is one.
+     */
+    private synchronized void tryDrainNextWaiting(final boolean force) {
+        // need to peek because the number of available items needs to be >= the expected size for that future. if not
+        // it needs to keep waiting
+        final Pair<CompletableFuture<List<Result>>, Integer> nextWaiting = waiting.peek();
+        if (force || (nextWaiting != null && (resultLinkedBlockingQueue.size() >= nextWaiting.getValue1() || readComplete.isDone()))) {
+            final int items = nextWaiting.getValue1();
+            final CompletableFuture<List<Result>> future = nextWaiting.getValue0();
+            final List<Result> results = new ArrayList<>(items);
+            resultLinkedBlockingQueue.drainTo(results, items);
+
+            // it's important to check for error here because a future may have already been queued in "waiting" prior
+            // to the first response back from the server. if that happens, any "waiting" futures should be completed
+            // exceptionally otherwise it will look like success.
+            if (null == error.get())
+                future.complete(results);
+            else
+                future.completeExceptionally(error.get());
 
-        flushed.set(true);
+            waiting.remove(nextWaiting);
+        }
     }
 
-    private void internalDrain(final int items, final CompletableFuture<List<Result>> result,
-                                      final LinkedBlockingQueue<Result> resultLinkedBlockingQueue) {
-        final List<Result> results = new ArrayList<>(items);
-        resultLinkedBlockingQueue.drainTo(results, items);
-
-        // it's important to check for error here because a future may have already been queued in "waiting" prior
-        // to the first response back from the server. if that happens, any "waiting" futures should be completed
-        // exceptionally otherwise it will look like success.
-        if (null == error.get())
-            result.complete(results);
-        else
-            result.completeExceptionally(error.get());
+    /**
+     * Completes all remaining futures.
+     */
+    private void drainAllWaiting() {
+        while (!waiting.isEmpty()) {
+            tryDrainNextWaiting(true);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3f857759/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
index 05f6fc0..d4e233b 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
@@ -127,11 +127,7 @@ public final class ResultSet implements Iterable<Result> {
 
             @Override
             public boolean hasNext() {
-                final List<Result> list = some(1).join();
-                assert list.size() <= 1;
-
-                nextOne = list.size() == 0 ? null : list.get(0);
-
+                nextOne = one();
                 return nextOne != null;
             }