You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by pi...@apache.org on 2016/09/19 17:33:32 UTC
[44/50] [abbrv] tinkerpop git commit: Improved session cleanup on
client close.
Improved session cleanup on client close.
While not a perfect implementation, a long run job blocking a close request from the client will now at least get an attempt at interruption rather thant consuming the thread indefinitely. TINKERPOP-1442
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/75baf01e
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/75baf01e
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/75baf01e
Branch: refs/heads/TINKERPOP-1404
Commit: 75baf01e83e7db2cfd60850e9facf535cf10d887
Parents: e7e7481
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Tue Sep 13 18:10:09 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri Sep 16 07:29:39 2016 -0400
----------------------------------------------------------------------
CHANGELOG.asciidoc | 1 +
.../tinkerpop/gremlin/driver/Connection.java | 2 +-
.../gremlin/groovy/engine/GremlinExecutor.java | 2 +-
.../gremlin/server/op/session/Session.java | 12 +++++++
.../server/op/session/SessionOpProcessor.java | 9 ++++++
.../server/GremlinDriverIntegrateTest.java | 2 +-
.../server/GremlinServerIntegrateTest.java | 4 +--
.../GremlinServerSessionIntegrateTest.java | 33 ++++++++++++++++++++
8 files changed, 60 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/75baf01e/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 4d990ee..a9dae9d 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
TinkerPop 3.1.5 (Release Date: NOT OFFICIALLY RELEASED YET)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+* Improved session cleanup when a close is triggered by the client.
* Removed the `appveyor.yml` file as the AppVeyor build is no longer enabled by Apache Infrastructure.
* Fixed a bug in `RangeByIsCountStrategy` which didn't use the `NotStep` properly.
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/75baf01e/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 22e48fe..220ad42 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -92,7 +92,7 @@ final class Connection {
connectionLabel = String.format("Connection{host=%s}", pool.host);
- if (cluster.isClosing()) throw new IllegalStateException("Cannot open a connection while the cluster after close() is called");
+ if (cluster.isClosing()) throw new IllegalStateException("Cannot open a connection with the cluster after close() is called");
final Bootstrap b = this.cluster.getFactory().createBootstrap();
try {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/75baf01e/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java b/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
index da12e1e..785442a 100644
--- a/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
+++ b/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
@@ -312,7 +312,7 @@ public class GremlinExecutor implements AutoCloseable {
if (root instanceof InterruptedException) {
lifeCycle.getAfterTimeout().orElse(afterTimeout).accept(bindings);
evaluationFuture.completeExceptionally(new TimeoutException(
- String.format("Script evaluation exceeded the configured 'scriptEvaluationTimeout' threshold of %s ms for request [%s]: %s", scriptEvalTimeOut, script, root.getMessage())));
+ String.format("Script evaluation exceeded the configured 'scriptEvaluationTimeout' threshold of %s ms or evaluation was otherwise cancelled directly for request [%s]: %s", scriptEvalTimeOut, script, root.getMessage())));
} else {
lifeCycle.getAfterFailure().orElse(afterFailure).accept(bindings, root);
evaluationFuture.completeExceptionally(root);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/75baf01e/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
index 33b2752..c9bc7c1 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -52,6 +53,7 @@ public class Session {
private final ScheduledExecutorService scheduledExecutorService;
private final long configuredSessionTimeout;
+ private AtomicBoolean killing = new AtomicBoolean(false);
private AtomicReference<ScheduledFuture> kill = new AtomicReference<>();
/**
@@ -104,6 +106,10 @@ public class Session {
return session;
}
+ public boolean acceptingRequests() {
+ return !killing.get();
+ }
+
public void touch() {
// if the task of killing is cancelled successfully then reset the session monitor. otherwise this session
// has already been killed and there's nothing left to do with this session.
@@ -134,6 +140,8 @@ public class Session {
* Kills the session and rollback any uncommitted changes on transactional graphs.
*/
public synchronized void kill() {
+ killing.set(true);
+
// if the session has already been removed then there's no need to do this process again. it's possible that
// the manuallKill and the kill future could have both called kill at roughly the same time. this prevents
// kill() from being called more than once
@@ -157,6 +165,10 @@ public class Session {
}
}
});
+
+ // prevent any additional requests from processing now that the mass rollback has been completed
+ executor.shutdownNow();
+
sessions.remove(session);
logger.info("Session {} closed", session);
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/75baf01e/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
index 3497169..bec0c55 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
@@ -147,6 +147,15 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor {
final RequestMessage msg = context.getRequestMessage();
final Session session = getSession(context, msg);
+ // check if the session is still accepting requests - if not block further requests
+ if (!session.acceptingRequests()) {
+ final String sessionClosedMessage = String.format("Session %s is no longer accepting requests as it has been closed",
+ session.getSessionId());
+ final ResponseMessage response = ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
+ .statusMessage(sessionClosedMessage).create();
+ throw new OpProcessorException(sessionClosedMessage, response);
+ }
+
// place the session on the channel context so that it can be used during serialization. in this way
// the serialization can occur on the same thread used to execute the gremlin within the session. this
// is important given the threadlocal nature of Graph implementation transactions.
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/75baf01e/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 7314243..1a04b6b 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -1258,7 +1258,7 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
{
final Throwable root = ExceptionUtils.getRootCause(ex);
assertThat(root, instanceOf(ResponseException.class));
- assertThat(root.getMessage(), startsWith("Script evaluation exceeded the configured 'scriptEvaluationTimeout' threshold of 250 ms for request"));
+ assertThat(root.getMessage(), startsWith("Script evaluation exceeded the configured 'scriptEvaluationTimeout' threshold of 250 ms"));
}
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/75baf01e/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
index 2f091d9..0f0cdae 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
@@ -543,7 +543,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
public void shouldReceiveFailureTimeOutOnScriptEval() throws Exception {
try (SimpleClient client = new WebSocketClient()){
final List<ResponseMessage> responses = client.submit("Thread.sleep(3000);'some-stuff-that-should not return'");
- assertThat(responses.get(0).getStatus().getMessage(), startsWith("Script evaluation exceeded the configured 'scriptEvaluationTimeout' threshold of 200 ms for request"));
+ assertThat(responses.get(0).getStatus().getMessage(), startsWith("Script evaluation exceeded the configured 'scriptEvaluationTimeout' threshold of 200 ms"));
// validate that we can still send messages to the server
assertEquals(2, ((List<Integer>) client.submit("1+1").get(0).getResult().getData()).get(0).intValue());
@@ -559,7 +559,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
.addArg(Tokens.ARGS_GREMLIN, "Thread.sleep(3000);'some-stuff-that-should not return'")
.create();
final List<ResponseMessage> responses = client.submit(msg);
- assertThat(responses.get(0).getStatus().getMessage(), startsWith("Script evaluation exceeded the configured 'scriptEvaluationTimeout' threshold of 100 ms for request"));
+ assertThat(responses.get(0).getStatus().getMessage(), startsWith("Script evaluation exceeded the configured 'scriptEvaluationTimeout' threshold of 100 ms"));
// validate that we can still send messages to the server
assertEquals(2, ((List<Integer>) client.submit("1+1").get(0).getResult().getData()).get(0).intValue());
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/75baf01e/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
index 8b34038..99b3a1b 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
@@ -50,6 +50,7 @@ import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.core.StringStartsWith.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@@ -109,6 +110,38 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInt
}
@Test
+ public void shouldBlockAdditionalRequestsDuringClose() throws Exception {
+ // this is sorta cobbled together a bit given limits/rules about how you can use Cluster/Client instances.
+ // basically, we need one to submit the long run job and one to do the close operation that will cancel the
+ // long run job. it is probably possible to do this with some low-level message manipulation but that's
+ // probably not necessary
+ final Cluster cluster1 = Cluster.build().create();
+ final Client client1 = cluster1.connect(name.getMethodName());
+ client1.submit("1+1").all().join();
+ final Cluster cluster2 = Cluster.build().create();
+ final Client client2 = cluster2.connect(name.getMethodName());
+ client2.submit("1+1").all().join();
+
+ final ResultSet rs = client1.submit("Thread.sleep(10000);1+1");
+
+ client2.close();
+
+ try {
+ rs.all().join();
+ fail("The close of the session on client2 should have interrupted the script sent on client1");
+ } catch (Exception ex) {
+ final Throwable root = ExceptionUtils.getRootCause(ex);
+ assertThat(root.getMessage(), startsWith("Script evaluation exceeded the configured 'scriptEvaluationTimeout' threshold of 30000 ms or evaluation was otherwise cancelled directly for request"));
+ }
+
+ client1.close();
+
+ cluster1.close();
+ cluster2.close();
+ }
+
+
+ @Test
public void shouldRollbackOnEvalExceptionForManagedTransaction() throws Exception {
assumeNeo4jIsPresent();