You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/06/30 17:56:03 UTC
[01/10] incubator-tinkerpop git commit: Use read complete future
instead of polling in ResultSet.all() TINKERPOP3-734
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 060093546 -> 0a1861dec
Use read complete future instead of polling in ResultSet.all() TINKERPOP3-734
This change had the additional side-effect of bubbling up some more detailed exception messages to the client. Improvements to javadoc.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/724f619a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/724f619a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/724f619a
Branch: refs/heads/master
Commit: 724f619aa74892a29c875e28f8b3936ec54619d9
Parents: d3a4b56
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Thu Jun 25 18:21:59 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Thu Jun 25 18:21:59 2015 -0400
----------------------------------------------------------------------
.../tinkerpop/gremlin/driver/Connection.java | 3 ++-
.../tinkerpop/gremlin/driver/Handler.java | 5 ++++
.../tinkerpop/gremlin/driver/ResultQueue.java | 6 +++++
.../tinkerpop/gremlin/driver/ResultSet.java | 26 +++++++++++++-------
.../driver/handler/WebSocketClientHandler.java | 1 +
.../server/GremlinDriverIntegrateTest.java | 9 +++----
.../server/GremlinServerIntegrateTest.java | 6 ++---
7 files changed, 37 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 364dade..4bd2237 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -175,13 +175,14 @@ final class Connection {
if (isClosed() && pending.isEmpty())
shutdown(closeFuture.get());
}, cluster.executor());
+
final ResultQueue handler = new ResultQueue(resultLinkedBlockingQueue, readCompleted);
pending.put(requestMessage.getRequestId(), handler);
final ResultSet resultSet = new ResultSet(handler, cluster.executor(), channel,
() -> {
pending.remove(requestMessage.getRequestId());
return null;
- });
+ }, readCompleted);
future.complete(resultSet);
}
});
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
index 1ce7f8f..fe27528 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
@@ -87,6 +87,11 @@ final class Handler {
// messages queue will not clear. wonder if there is some way to cope with that. of course, if
// there are that many failures someone would take notice and hopefully stop the client.
logger.error("Could not process the response - correct the problem and restart the driver.", cause);
+
+ // the channel is getting closed because of something pretty bad so release all the completeable
+ // futures out there
+ pending.entrySet().stream().forEach(kv -> kv.getValue().markError(cause));
+
ctx.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
index 5f1022f..6f33343 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.driver;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -81,6 +82,11 @@ final class ResultQueue {
return result;
}
+ public void drainTo(final Collection<Result> collection) {
+ if (error.get() != null) throw new RuntimeException(error.get());
+ resultLinkedBlockingQueue.drainTo(collection);
+ }
+
public Status getStatus() {
return status;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
index 9f987bc..74ce9b2 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
@@ -37,6 +37,11 @@ import java.util.stream.StreamSupport;
* therefore may not be available immediately. As such, {@code ResultSet} provides access to a a number
* of functions that help to work with the asynchronous nature of the data streaming back. Data from results
* is stored in an {@link Result} which can be used to retrieve the item once it is on the client side.
+ * <p/>
+ * Note that a {@code ResultSet} is a forward-only stream only so depending on how the methods are called and
+ * interacted with, it is possible to return partial bits of total response (e.g. calling {@link #one()} followed
+ * by {@link #all()} will make it so that the {@link List} of results returned from {@link #all()} have one
+ * {@link Result} missing from the total set as it was already retrieved by {@link #one}.
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
@@ -46,19 +51,23 @@ public final class ResultSet implements Iterable<Result> {
private final Channel channel;
private final Supplier<Void> onChannelError;
+ private final CompletableFuture<Void> readCompleted;
+
public ResultSet(final ResultQueue resultQueue, final ExecutorService executor,
- final Channel channel, final Supplier<Void> onChannelError) {
+ final Channel channel, final Supplier<Void> onChannelError,
+ final CompletableFuture<Void> readCompleted) {
this.executor = executor;
this.resultQueue = resultQueue;
this.channel = channel;
this.onChannelError = onChannelError;
+ this.readCompleted = readCompleted;
}
/**
* Determines if all items have been returned to the client.
*/
public boolean allItemsAvailable() {
- return resultQueue.getStatus() == ResultQueue.Status.COMPLETE;
+ return readCompleted.isDone();
}
/**
@@ -110,16 +119,15 @@ public final class ResultSet implements Iterable<Result> {
}
/**
- * Wait for all items to be available on the client exhausting the stream.
+ * The returned {@link CompletableFuture} completes when all reads are complete for this request and the
+ * entire result has been accounted for on the client. While this method is named "all" it really refers to
+ * retrieving all remaining items in the set. For large result sets it is preferred to use
+ * {@link Iterator} or {@link Stream} options, as the results will be held in memory at once.
*/
public CompletableFuture<List<Result>> all() {
- return CompletableFuture.supplyAsync(() -> {
+ return readCompleted.thenApplyAsync(it -> {
final List<Result> list = new ArrayList<>();
- while (!isExhausted()) {
- final Result result = resultQueue.poll();
- if (result != null)
- list.add(result);
- }
+ resultQueue.drainTo(list);
return list;
}, executor);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 81f1e41..922775e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -98,5 +98,6 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
logger.warn("Exception caught during WebSocket processing - closing connection", cause);
if (!handshakeFuture.isDone()) handshakeFuture.setFailure(cause);
ctx.close();
+ ctx.fireExceptionCaught(cause);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index e8b8f1e..f86cf18 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -53,6 +53,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.core.StringStartsWith.startsWith;
import static org.junit.Assert.*;
/**
@@ -114,7 +115,7 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
} catch (Exception ex) {
final Throwable inner = ExceptionUtils.getRootCause(ex);
assertTrue(inner instanceof RuntimeException);
- assertEquals("Error while processing results from channel - check client and server logs for more information", inner.getMessage());
+ assertThat(inner.getMessage(), startsWith("Encountered unregistered class ID:"));
}
cluster.close();
@@ -385,10 +386,8 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
client.submit("'" + fatty + "'").all().get();
fail("Should throw an exception.");
} catch (Exception re) {
- // can't seem to catch the server side exception - as the channel is basically closed on this error
- // can only detect a closed channel and react to that. in some ways this is a good general piece of
- // code to have in place, but kinda stinky when you want something specific about why all went bad
- assertTrue(re.getCause().getMessage().equals("Error while processing results from channel - check client and server logs for more information"));
+ Throwable root = ExceptionUtils.getRootCause(re);
+ assertTrue(root.getMessage().equals("Max frame length of 1 has been exceeded."));
} finally {
cluster.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/724f619a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
index 9a7607c..9397397 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
@@ -396,10 +396,8 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
resultSet.all().get();
fail("Should throw an exception.");
} catch (Exception re) {
- // can't seem to catch the server side exception - as the channel is basically closed on this error
- // can only detect a closed channel and react to that. in some ways this is a good general piece of
- // code to have in place, but kinda stinky when you want something specific about why all went bad
- assertTrue(re.getCause().getMessage().equals("Error while processing results from channel - check client and server logs for more information"));
+ Throwable root = ExceptionUtils.getRootCause(re);
+ assertEquals("Connection reset by peer", root.getMessage());
} finally {
cluster.close();
}
[06/10] incubator-tinkerpop git commit: Merge remote-tracking branch
'origin/master' into resultset-refactor
Posted by sp...@apache.org.
Merge remote-tracking branch 'origin/master' into resultset-refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/a1c8972d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/a1c8972d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/a1c8972d
Branch: refs/heads/master
Commit: a1c8972d51d2a7325fd4d97b8057d3e4cd046674
Parents: 382ccbf 87e4e77
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Mon Jun 29 07:17:35 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Mon Jun 29 07:17:35 2015 -0400
----------------------------------------------------------------------
CHANGELOG.asciidoc | 1 +
bin/gephi-mock.py | 48 +++++++++++++++++
bin/gephi.mock | 24 ---------
bin/process-docs.sh | 18 ++++++-
docs/src/gremlin-applications.asciidoc | 5 +-
docs/src/the-traversal.asciidoc | 57 +++++++++++++++++++-
gremlin-console/pom.xml | 2 +-
.../GephiRemoteAcceptorIntegrateTest.java | 17 +++++-
.../tinkerpop/gremlin/structure/Graph.java | 2 +-
.../gremlin/structure/Transaction.java | 8 ++-
.../structure/util/AbstractTransaction.java | 4 +-
.../driver/util/ProfilingApplication.java | 15 ++++--
.../gremlin/structure/FeatureSupportTest.java | 2 +-
.../gremlin/structure/TransactionTest.java | 2 +-
14 files changed, 159 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
[02/10] incubator-tinkerpop git commit: Merge remote-tracking branch
'origin/master' into resultset-refactor
Posted by sp...@apache.org.
Merge remote-tracking branch 'origin/master' into resultset-refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/015ea494
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/015ea494
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/015ea494
Branch: refs/heads/master
Commit: 015ea494b6d2c20297325e405fe834c0abf02dee
Parents: 724f619 6a94788
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri Jun 26 07:47:19 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri Jun 26 07:47:19 2015 -0400
----------------------------------------------------------------------
bin/gephi.mock | 24 ++++
bin/process-docs.sh | 2 +
docs/src/gremlin-applications.asciidoc | 74 +++++-------
docs/src/implementations.asciidoc | 2 +-
docs/src/intro.asciidoc | 6 +-
docs/src/the-traversal.asciidoc | 54 +++++----
.../traversal/dsl/graph/GraphTraversal.java | 6 +-
.../gremlin/process/traversal/dsl/graph/__.java | 7 --
.../process/traversal/step/map/MatchStep.java | 118 ++++++++++++-------
.../optimization/MatchPredicateStrategy.java | 5 +-
.../traversal/step/map/MatchStepTest.java | 56 ++++++---
.../MatchPredicateStrategyTest.java | 5 +-
.../traversal/step/map/GroovyMatchTest.groovy | 74 ++++++------
.../process/TraversalPerformanceTest.java | 2 +-
.../process/traversal/step/map/MatchTest.java | 104 ++++++++--------
.../neo4j/process/NativeNeo4jCypherTest.java | 16 +--
.../tinkergraph/structure/TinkerGraphTest.java | 20 ++--
17 files changed, 321 insertions(+), 254 deletions(-)
----------------------------------------------------------------------
[08/10] incubator-tinkerpop git commit: Merge remote-tracking branch
'origin/resultset-refactor'
Posted by sp...@apache.org.
Merge remote-tracking branch 'origin/resultset-refactor'
Conflicts:
gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b27be302
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b27be302
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b27be302
Branch: refs/heads/master
Commit: b27be3026520fb730ea7ab1027cc4673aeff6947
Parents: 99e0920 f65e6ca
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Tue Jun 30 11:52:10 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Tue Jun 30 11:52:10 2015 -0400
----------------------------------------------------------------------
.../tinkerpop/gremlin/driver/Connection.java | 3 +-
.../tinkerpop/gremlin/driver/Handler.java | 5 +
.../tinkerpop/gremlin/driver/ResultQueue.java | 91 ++++++++++-----
.../tinkerpop/gremlin/driver/ResultSet.java | 69 ++++-------
.../driver/handler/WebSocketClientHandler.java | 1 +
.../server/GremlinDriverIntegrateTest.java | 114 +++++++++++++++++--
.../server/GremlinServerIntegrateTest.java | 6 +-
7 files changed, 202 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b27be302/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --cc gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 21f14b8,1b5f6c5..273b5db
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@@ -54,11 -54,9 +55,13 @@@ import java.util.stream.Collectors
import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+ import static org.hamcrest.CoreMatchers.is;
+ import static org.hamcrest.core.StringStartsWith.startsWith;
-import static org.junit.Assert.*;
/**
* Integration tests for gremlin-driver configurations and settings.
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b27be302/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
----------------------------------------------------------------------
[09/10] incubator-tinkerpop git commit: Update changelog.
Posted by sp...@apache.org.
Update changelog.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/5ebc3670
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/5ebc3670
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/5ebc3670
Branch: refs/heads/master
Commit: 5ebc3670e2d8221ed2f5c706125b483a1a02b66f
Parents: b27be30
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Tue Jun 30 11:55:30 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Tue Jun 30 11:55:30 2015 -0400
----------------------------------------------------------------------
CHANGELOG.asciidoc | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5ebc3670/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 0f9f98e..2af44b7 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -29,7 +29,8 @@ TinkerPop 3.0.0.GA (NOT OFFICIALLY RELEASED YET)
* Turned transactional testing back on in Gremlin Server using Neo4j.
* Renamed `Transaction.create()` to `Transaction.createThreadedTx()`.
* Added `TraversalParent.removeGlobalChild()` and `TraversalParent.removeLocalChild()`.
-* Add a `clear` option to the Gephi Plugin to empty the Gephi workspace.
+* Added a `clear` option to the Gephi Plugin to empty the Gephi workspace.
+* Refactored `ResultSet` and related classes to stop polling for results.
* `AbstractStep` now guarantees that bulk-less and null-valued traversers are never propagated.
* Added `dedup(string...)` which allows for the deduplication of a stream based on unique scope values.
* Fixed multiple bugs in the Gephi Plugin related to refactoring of traversal side-effects.
[03/10] incubator-tinkerpop git commit: Merge remote-tracking branch
'origin/master' into resultset-refactor
Posted by sp...@apache.org.
Merge remote-tracking branch 'origin/master' into resultset-refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/35122afa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/35122afa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/35122afa
Branch: refs/heads/master
Commit: 35122afa3fedfc494c43d5c0d8053af2dd4c8225
Parents: 015ea49 8044315
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri Jun 26 08:24:18 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri Jun 26 08:24:18 2015 -0400
----------------------------------------------------------------------
.../apache/tinkerpop/gremlin/driver/Result.java | 20 +++-
.../tinkerpop/gremlin/driver/ResultTest.java | 34 +++++-
.../server/GremlinDriverIntegrateTest.java | 22 ----
.../server/GremlinResultSetIntegrateTest.java | 116 +++++++++++++++++++
4 files changed, 168 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/35122afa/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
[04/10] incubator-tinkerpop git commit: Merge remote-tracking branch
'origin/master' into resultset-refactor
Posted by sp...@apache.org.
Merge remote-tracking branch 'origin/master' into resultset-refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/ac53d5a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/ac53d5a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/ac53d5a6
Branch: refs/heads/master
Commit: ac53d5a6b8da8052a53a159931a642170f7d928c
Parents: 35122af d5b70be
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri Jun 26 13:34:43 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri Jun 26 13:34:43 2015 -0400
----------------------------------------------------------------------
CHANGELOG.asciidoc | 2 +
bin/process-docs.sh | 2 +-
docs/src/the-traversal.asciidoc | 26 ++++-----
.../process/traversal/step/map/MatchStep.java | 16 ++----
.../optimization/MatchPredicateStrategy.java | 26 +++++++--
.../process/traversal/util/TraversalHelper.java | 5 +-
.../traversal/step/map/MatchStepTest.java | 3 --
.../MatchPredicateStrategyTest.java | 14 ++++-
.../traversal/step/map/GroovyMatchTest.groovy | 8 +--
.../process/traversal/step/map/MatchTest.java | 17 +++---
.../gremlin/hadoop/structure/HadoopGraph.java | 4 +-
.../MultiMetaNeo4jGraphNativeNeo4jTest.java | 36 +++++++++++++
.../NoMultiNoMetaNeo4jGraphNativeNeo4jTest.java | 36 +++++++++++++
.../neo4j/process/NativeNeo4jCypherTest.java | 3 +-
.../MultiMetaNeo4jGraphNativeNeo4jTest.java | 35 ------------
.../NoMultiNoMetaNeo4jGraphNativeNeo4jTest.java | 35 ------------
.../tinkergraph/structure/TinkerGraphTest.java | 56 +++-----------------
17 files changed, 150 insertions(+), 174 deletions(-)
----------------------------------------------------------------------
[07/10] incubator-tinkerpop git commit: Merge remote-tracking branch
'origin/master' into resultset-refactor
Posted by sp...@apache.org.
Merge remote-tracking branch 'origin/master' into resultset-refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/f65e6caa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/f65e6caa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/f65e6caa
Branch: refs/heads/master
Commit: f65e6caa0e9620e4fea4c74bd79317053b0ce6a5
Parents: a1c8972 c33468c
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Mon Jun 29 11:54:31 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Mon Jun 29 11:54:31 2015 -0400
----------------------------------------------------------------------
.../gremlin/driver/util/ProfilingApplication.java | 14 ++++++++++----
1 file changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
[05/10] incubator-tinkerpop git commit: Refactored ResultSet pretty
heavily.
Posted by sp...@apache.org.
Refactored ResultSet pretty heavily.
Added a some() method to get some specified number of Result objects back and refactored the implementation of one().
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/382ccbfe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/382ccbfe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/382ccbfe
Branch: refs/heads/master
Commit: 382ccbfecbcbb4a6eff3121cd48b26f1fa9ea52d
Parents: ac53d5a
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri Jun 26 13:57:58 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri Jun 26 13:57:58 2015 -0400
----------------------------------------------------------------------
.../tinkerpop/gremlin/driver/ResultQueue.java | 91 ++++++++++------
.../tinkerpop/gremlin/driver/ResultSet.java | 43 ++------
.../server/GremlinDriverIntegrateTest.java | 105 ++++++++++++++++++-
3 files changed, 168 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/382ccbfe/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
index 6f33343..cc65e4d 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
@@ -19,34 +19,41 @@
package org.apache.tinkerpop.gremlin.driver;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.javatuples.Pair;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* A queue of incoming {@link ResponseMessage} objects. The queue is updated by the
- * {@link Handler.GremlinResponseHandler} until a response terminator is identified. At that point the fetch
- * status is changed to {@link Status#COMPLETE} and all results have made it client side.
+ * {@link Handler.GremlinResponseHandler} until a response terminator is identified.
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
final class ResultQueue {
- public enum Status {
- FETCHING,
- COMPLETE
- }
private final LinkedBlockingQueue<Result> resultLinkedBlockingQueue;
- private volatile Status status = Status.FETCHING;
-
private final AtomicReference<Throwable> error = new AtomicReference<>();
private final CompletableFuture<Void> readComplete;
+ private final Queue<Pair<CompletableFuture<List<Result>>,Integer>> waiting = new ConcurrentLinkedQueue<>();
+
+ /**
+ * Tracks the state of the "waiting" queue and whether or not results have been drained through it on
+ * read complete. If they are then no additional "waiting" is required.
+ */
+ private final AtomicBoolean flushed = new AtomicBoolean(false);
+
public ResultQueue(final LinkedBlockingQueue<Result> resultLinkedBlockingQueue, final CompletableFuture<Void> readComplete) {
this.resultLinkedBlockingQueue = resultLinkedBlockingQueue;
this.readComplete = readComplete;
@@ -54,6 +61,38 @@ final class ResultQueue {
public void add(final Result result) {
this.resultLinkedBlockingQueue.offer(result);
+
+ final Pair<CompletableFuture<List<Result>>, Integer> nextWaiting = waiting.peek();
+ if (nextWaiting != null && (resultLinkedBlockingQueue.size() > nextWaiting.getValue1() || readComplete.isDone())) {
+ final List<Result> results = new ArrayList<>(nextWaiting.getValue1());
+ resultLinkedBlockingQueue.drainTo(results, nextWaiting.getValue1());
+ nextWaiting.getValue0().complete(results);
+ waiting.remove(nextWaiting);
+ }
+ }
+
+ public CompletableFuture<List<Result>> await(final int items) {
+ final CompletableFuture<List<Result>> result = new CompletableFuture<>();
+ if (size() > items || readComplete.isDone()) {
+ // items are present so just drain to requested size if possible then complete it
+ final List<Result> results = new ArrayList<>(items);
+ resultLinkedBlockingQueue.drainTo(results, items);
+ result.complete(results);
+ } else {
+ // not enough items in the result queue so save this for callback later when the results actually arrive.
+ // only necessary to "wait" if we're not in the act of flushing already, in which case, no more waiting
+ // for additional results should be allowed.
+ if (flushed.get()) {
+ // just drain since we've flushed already
+ final List<Result> results = new ArrayList<>(items);
+ resultLinkedBlockingQueue.drainTo(results, items);
+ result.complete(results);
+ } else {
+ waiting.add(Pair.with(result, items));
+ }
+ }
+
+ return result;
}
public int size() {
@@ -66,38 +105,30 @@ final class ResultQueue {
return this.size() == 0;
}
- public Result poll() {
- Result result = null;
- do {
- if (error.get() != null) throw new RuntimeException(error.get());
- try {
- result = resultLinkedBlockingQueue.poll(10, TimeUnit.MILLISECONDS);
- } catch (InterruptedException ie) {
- error.set(new RuntimeException(ie));
- }
- } while (null == result && status == Status.FETCHING);
-
- if (error.get() != null) throw new RuntimeException(error.get());
-
- return result;
- }
-
public void drainTo(final Collection<Result> collection) {
if (error.get() != null) throw new RuntimeException(error.get());
resultLinkedBlockingQueue.drainTo(collection);
}
- public Status getStatus() {
- return status;
- }
-
void markComplete() {
- this.status = Status.COMPLETE;
this.readComplete.complete(null);
+ this.flushWaiting();
}
void markError(final Throwable throwable) {
error.set(throwable);
this.readComplete.complete(null);
+ this.flushWaiting();
+ }
+
+ private void flushWaiting() {
+ while (waiting.peek() != null) {
+ final Pair<CompletableFuture<List<Result>>, Integer> nextWaiting = waiting.poll();
+ final List<Result> results = new ArrayList<>(nextWaiting.getValue1());
+ resultLinkedBlockingQueue.drainTo(results, nextWaiting.getValue1());
+ nextWaiting.getValue0().complete(results);
+ }
+
+ flushed.set(true);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/382ccbfe/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
index 74ce9b2..66a9ff0 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
@@ -81,41 +81,23 @@ public final class ResultSet implements Iterable<Result> {
* Determines if there are any remaining items being streamed to the client.
*/
public boolean isExhausted() {
- if (!resultQueue.isEmpty())
- return false;
-
- internalAwaitItems(1);
-
- assert !resultQueue.isEmpty() || allItemsAvailable();
- return resultQueue.isEmpty();
+ return !(!allItemsAvailable() || !resultQueue.isEmpty());
}
/**
* Get the next {@link Result} from the stream, blocking until one is available.
*/
public Result one() {
- Result result = resultQueue.poll();
- if (result != null)
- return result;
-
- internalAwaitItems(1);
-
- result = resultQueue.poll();
- if (result != null)
- return result;
- else
- return null;
+ return some(1).join().get(0);
}
/**
- * Wait for some number of items to be available on the client. The future will contain the number of items
- * available which may or may not be the number the caller was waiting for.
+ * The returned {@link CompletableFuture} completes when the number of items specified are available. The
+ * number returned will be equal to or less than that number. They will only be less if the stream is
+ * completed and there are less than that number specified available.
*/
- public CompletableFuture<Integer> awaitItems(final int items) {
- if (allItemsAvailable())
- CompletableFuture.completedFuture(getAvailableItemCount());
-
- return CompletableFuture.supplyAsync(() -> internalAwaitItems(items), executor);
+ public CompletableFuture<List<Result>> some(final int items) {
+ return resultQueue.await(items);
}
/**
@@ -159,15 +141,4 @@ public final class ResultSet implements Iterable<Result> {
}
};
}
-
- private int internalAwaitItems(final int items) {
- while (!allItemsAvailable() && getAvailableItemCount() < items) {
- if (!channel.isOpen()) {
- onChannelError.get();
- throw new RuntimeException("Error while processing results from channel - check client and server logs for more information");
- }
- }
-
- return getAvailableItemCount();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/382ccbfe/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 1417ce9..1b5f6c5 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -41,6 +41,7 @@ import org.junit.rules.TestName;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -53,6 +54,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.core.StringStartsWith.startsWith;
import static org.junit.Assert.*;
@@ -170,6 +172,96 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
final AtomicInteger counter = new AtomicInteger(0);
results.stream().map(i -> i.get(Integer.class) * 2).forEach(i -> assertEquals(counter.incrementAndGet() * 2, Integer.parseInt(i.toString())));
+ assertEquals(9, counter.get());
+ assertThat(results.allItemsAvailable(), is(true));
+ assertThat(results.isExhausted(), is(true));
+
+ // cant stream it again
+ assertThat(results.stream().iterator().hasNext(), is(false));
+
+ cluster.close();
+ }
+
+ @Test
+ public void shouldIterate() throws Exception {
+ final Cluster cluster = Cluster.open();
+ final Client client = cluster.connect();
+
+ final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
+ final Iterator<Result> itty = results.iterator();
+ final AtomicInteger counter = new AtomicInteger(0);
+ while (itty.hasNext()) {
+ counter.incrementAndGet();
+ assertEquals(counter.get(), itty.next().getInt());
+ }
+
+ assertEquals(9, counter.get());
+ assertThat(results.allItemsAvailable(), is(true));
+ assertThat(results.isExhausted(), is(true));
+
+ // can't stream it again
+ assertThat(results.iterator().hasNext(), is(false));
+
+ cluster.close();
+ }
+
+ @Test
+ public void shouldGetSomeThenSomeMore() throws Exception {
+ final Cluster cluster = Cluster.open();
+ final Client client = cluster.connect();
+
+ final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
+ final CompletableFuture<List<Result>> batch1 = results.some(5);
+ final CompletableFuture<List<Result>> batch2 = results.some(5);
+ final CompletableFuture<List<Result>> batchNothingLeft = results.some(5);
+
+ assertEquals(5, batch1.get().size());
+ assertEquals(1, batch1.get().get(0).getInt());
+ assertEquals(2, batch1.get().get(1).getInt());
+ assertEquals(3, batch1.get().get(2).getInt());
+ assertEquals(4, batch1.get().get(3).getInt());
+ assertEquals(5, batch1.get().get(4).getInt());
+ assertThat(results.isExhausted(), is(false));
+
+ assertEquals(4, batch2.get().size());
+ assertEquals(6, batch2.get().get(0).getInt());
+ assertEquals(7, batch2.get().get(1).getInt());
+ assertEquals(8, batch2.get().get(2).getInt());
+ assertEquals(9, batch2.get().get(3).getInt());
+ assertThat(results.isExhausted(), is(true));
+
+ assertEquals(0, batchNothingLeft.get().size());
+
+ cluster.close();
+ }
+
+ @Test
+ public void shouldGetOneThenSomeThenSomeMore() throws Exception {
+ final Cluster cluster = Cluster.open();
+ final Client client = cluster.connect();
+
+ final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
+ final Result one = results.one();
+ final CompletableFuture<List<Result>> batch1 = results.some(4);
+ final CompletableFuture<List<Result>> batch2 = results.some(5);
+ final CompletableFuture<List<Result>> batchNothingLeft = results.some(5);
+
+ assertEquals(1, one.getInt());
+
+ assertEquals(4, batch1.get().size());
+ assertEquals(2, batch1.get().get(0).getInt());
+ assertEquals(3, batch1.get().get(1).getInt());
+ assertEquals(4, batch1.get().get(2).getInt());
+ assertEquals(5, batch1.get().get(3).getInt());
+
+ assertEquals(4, batch2.get().size());
+ assertEquals(6, batch2.get().get(0).getInt());
+ assertEquals(7, batch2.get().get(1).getInt());
+ assertEquals(8, batch2.get().get(2).getInt());
+ assertEquals(9, batch2.get().get(3).getInt());
+ assertThat(results.isExhausted(), is(true));
+
+ assertEquals(0, batchNothingLeft.get().size());
cluster.close();
}
@@ -177,10 +269,13 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
/**
* This test arose from this issue: https://github.org/apache/tinkerpop/tinkerpop3/issues/515
* <p/>
- * ResultSet.all returns a CompleteableFuture that blocks on the worker pool until isExausted returns false.
- * isExausted in turn needs a thread on the worker pool to even return. So its totally possible to consume all
+ * ResultSet.all returns a CompletableFuture that blocks on the worker pool until isExhausted returns false.
+ * isExhausted in turn needs a thread on the worker pool to even return. So its totally possible to consume all
* threads on the worker pool waiting for .all to finish such that you can't even get one to wait for
- * isExausted to run.
+ * isExhausted to run.
+ * <p/>
+ * Note that all() doesn't work as described above anymore. It waits for callback on readComplete rather
+ * than blocking on isExhausted.
*/
@Test
public void shouldAvoidDeadlockOnCallToResultSetDotAll() throws Exception {
@@ -377,8 +472,8 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
final Client client = cluster.connect(name.getMethodName());
final ResultSet results1 = client.submit("x = [1,2,3,4,5,6,7,8,9]");
- final AtomicInteger counter = new AtomicInteger(0);
- results1.stream().map(i -> i.get(Integer.class) * 2).forEach(i -> assertEquals(counter.incrementAndGet() * 2, Integer.parseInt(i.toString())));
+ assertEquals(9, results1.all().get().size());
+ assertThat(results1.isExhausted(), is(true));
final ResultSet results2 = client.submit("x[0]+1");
assertEquals(2, results2.all().get().get(0).getInt());
[10/10] incubator-tinkerpop git commit: Merge remote-tracking branch
'origin/master'
Posted by sp...@apache.org.
Merge remote-tracking branch 'origin/master'
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/0a1861de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/0a1861de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/0a1861de
Branch: refs/heads/master
Commit: 0a1861dec61f5475319b4035e0385ae5dae84476
Parents: 5ebc367 0600935
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Tue Jun 30 11:55:53 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Tue Jun 30 11:55:53 2015 -0400
----------------------------------------------------------------------
CHANGELOG.asciidoc | 1 +
docs/src/the-graph.asciidoc | 4 +-
docs/src/the-traversal.asciidoc | 34 +-
docs/static/images/choose-step.png | Bin 120817 -> 118627 bytes
docs/static/images/has-step.png | Bin 92883 -> 93721 bytes
docs/static/images/step-types.png | Bin 129946 -> 129207 bytes
docs/static/images/tinkerpop3.graffle | 1124 ++++++++++--------
.../traversal/dsl/graph/GraphTraversal.java | 14 +-
.../gremlin/process/traversal/dsl/graph/__.java | 16 +-
.../traversal/step/map/SelectOneStep.java | 4 -
.../process/traversal/step/map/SelectStep.java | 42 +-
.../LambdaRestrictionStrategyTest.java | 2 +-
.../step/filter/GroovyDedupTest.groovy | 4 +-
.../step/filter/GroovyRangeTest.groovy | 16 +-
.../traversal/step/filter/GroovyTailTest.groovy | 8 +-
.../step/filter/GroovyWhereTest.groovy | 32 +-
.../traversal/step/map/GroovyAddEdgeTest.groovy | 4 +-
.../traversal/step/map/GroovyMatchTest.groovy | 16 +-
.../traversal/step/map/GroovyOrderTest.groovy | 4 +-
.../traversal/step/map/GroovySelectTest.groovy | 58 +-
.../traversal/step/filter/DedupTest.java | 10 +-
.../traversal/step/filter/RangeTest.java | 42 +-
.../process/traversal/step/filter/TailTest.java | 21 +-
.../traversal/step/filter/WhereTest.java | 80 +-
.../process/traversal/step/map/AddEdgeTest.java | 10 +-
.../process/traversal/step/map/MatchTest.java | 41 +-
.../process/traversal/step/map/OrderTest.java | 12 +-
.../process/traversal/step/map/SelectTest.java | 225 ++--
.../gremlin/hadoop/structure/HadoopGraph.java | 4 +-
.../neo4j/process/NativeNeo4jCypherTest.java | 28 +-
.../tinkergraph/structure/TinkerGraphTest.java | 2 +-
31 files changed, 929 insertions(+), 929 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0a1861de/CHANGELOG.asciidoc
----------------------------------------------------------------------