You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/24 00:21:57 UTC

[04/35] incubator-tinkerpop git commit: TINKERPOP-1159 Prevented multiple close requests on call to Client.close()

TINKERPOP-1159 Prevented multiple close requests on call to Client.close()

This problem occured around sessions specifically as there would be one call to shutdown() from both the call to Client.closeAsync() and from the call back to Client.write(). Just needed to change it so that the code of shutdown() only executed once no matter how many times it was called. CTR.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/888ac6a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/888ac6a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/888ac6a2

Branch: refs/heads/TINKERPOP-1166
Commit: 888ac6a23ee0d26bf8c2c3ab43b72a58120beffd
Parents: 7b8ce4b
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Tue Feb 16 12:10:29 2016 -0500
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Tue Feb 16 12:10:29 2016 -0500

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  1 +
 .../tinkerpop/gremlin/driver/Connection.java    | 62 ++++++++++++--------
 2 files changed, 37 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/888ac6a2/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 3e4cea3..b35efcb 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,7 @@ image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/
 TinkerPop 3.1.2 (NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+* Fixed a bug where multiple "close" requests were being sent by the driver on `Client.close()`.
 * Fixed an `Property` attach bug that shows up in serialization-based `GraphComputer` implementations.
 * Fixed a pom.xml bug where Gremlin Console/Server were not pulling the latest Neo4j 2.3.2.
 * Fixed bug in "round robin" load balancing in `gremlin-driver` where requests were wrongly being sent to the same host.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/888ac6a2/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index f3fbf0d..cb26781 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -33,6 +33,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -75,6 +76,7 @@ final class Connection {
     private final Channelizer channelizer;
 
     private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
+    private final AtomicBoolean shutdownInitiated = new AtomicBoolean(false);
 
     public Connection(final URI uri, final ConnectionPool pool, final int maxInProcess) throws ConnectionException {
         this.uri = uri;
@@ -185,6 +187,9 @@ final class Connection {
                         final CompletableFuture<Void> readCompleted = new CompletableFuture<>();
                         readCompleted.thenAcceptAsync(v -> {
                             thisConnection.returnToPool();
+
+                            // close was signaled in closeAsync() but there were pending messages at that time. attempt
+                            // the shutdown if the returned result cleared up the last pending message
                             if (isClosed() && pending.isEmpty())
                                 shutdown(closeFuture.get());
                         }, cluster.executor());
@@ -209,35 +214,40 @@ final class Connection {
     }
 
     private void shutdown(final CompletableFuture<Void> future) {
-        if (client instanceof Client.SessionedClient) {
-            // maybe this should be delegated back to the Client implementation???
-            final RequestMessage closeMessage = client.buildMessage(RequestMessage.build(Tokens.OPS_CLOSE));
-            final CompletableFuture<ResultSet> closed = new CompletableFuture<>();
-            write(closeMessage, closed);
-
-            try {
-                // make sure we get a response here to validate that things closed as expected.  on error, we'll let
-                // the server try to clean up on its own.  the primary error here should probably be related to
-                // protocol issues which should not be something a user has to fuss with.
-                closed.get();
-            } catch (Exception ex) {
-                final String msg = String.format(
-                    "Encountered an error trying to close connection on %s - force closing - server will close session on shutdown or timeout.",
-                    ((Client.SessionedClient) client).getSessionId());
-                logger.warn(msg, ex);
+        // shutdown can be called directly from closeAsync() or after write() and therefore this method should only
+        // be called once. once shutdown is initiated, it shoudln't be executed a second time or else it sends more
+        // messages at the server and leads to ugly log messages over there.
+        if (shutdownInitiated.compareAndSet(false, true)) {
+            if (client instanceof Client.SessionedClient) {
+                // maybe this should be delegated back to the Client implementation???
+                final RequestMessage closeMessage = client.buildMessage(RequestMessage.build(Tokens.OPS_CLOSE));
+                final CompletableFuture<ResultSet> closed = new CompletableFuture<>();
+                write(closeMessage, closed);
+
+                try {
+                    // make sure we get a response here to validate that things closed as expected.  on error, we'll let
+                    // the server try to clean up on its own.  the primary error here should probably be related to
+                    // protocol issues which should not be something a user has to fuss with.
+                    closed.get();
+                } catch (Exception ex) {
+                    final String msg = String.format(
+                            "Encountered an error trying to close connection on %s - force closing - server will close session on shutdown or timeout.",
+                            ((Client.SessionedClient) client).getSessionId());
+                    logger.warn(msg, ex);
+                }
             }
-        }
 
-        channelizer.close(channel);
-        final ChannelPromise promise = channel.newPromise();
-        promise.addListener(f -> {
-            if (f.cause() != null)
-                future.completeExceptionally(f.cause());
-            else
-                future.complete(null);
-        });
+            channelizer.close(channel);
+            final ChannelPromise promise = channel.newPromise();
+            promise.addListener(f -> {
+                if (f.cause() != null)
+                    future.completeExceptionally(f.cause());
+                else
+                    future.complete(null);
+            });
 
-        channel.close(promise);
+            channel.close(promise);
+        }
     }
 
     public String getConnectionInfo() {