You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2016/09/29 17:10:33 UTC

[1/3] tinkerpop git commit: Corrected a number of problems in close() operations for the driver. [Forced Update!]

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1467-master 15d9aa282 -> c58866892 (forced update)


Corrected a number of problems in close() operations for the driver.

This was more of a commit than I wanted for tp31, but close() was really messed up. Fixed a number of race conditions and other logic that would allow the driver to hang on close. Also made it so that the Cluster makes an attempt to clean up any Client instances that it spawns.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/0c871715
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/0c871715
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/0c871715

Branch: refs/heads/TINKERPOP-1467-master
Commit: 0c871715a794170841a3ea9e125b9389ee6ff54b
Parents: 0070d3d
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Thu Sep 29 08:40:07 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Thu Sep 29 08:41:46 2016 -0400

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  1 +
 .../upgrade/release-3.1.x-incubating.asciidoc   | 17 +++++
 .../tinkerpop/gremlin/driver/Channelizer.java   |  6 +-
 .../apache/tinkerpop/gremlin/driver/Client.java | 42 +++++++++--
 .../tinkerpop/gremlin/driver/Cluster.java       | 22 +++++-
 .../tinkerpop/gremlin/driver/Connection.java    | 57 ++++++++++++--
 .../gremlin/driver/ConnectionPool.java          | 21 +++---
 .../tinkerpop/gremlin/driver/Handler.java       | 27 +++++--
 .../tinkerpop/gremlin/driver/ResultQueue.java   |  4 +
 .../driver/handler/WebSocketClientHandler.java  |  4 +-
 .../server/GremlinDriverIntegrateTest.java      | 79 +++++++++++++++++++-
 .../server/GremlinServerAuthIntegrateTest.java  |  5 +-
 .../GremlinServerAuthOldIntegrateTest.java      |  4 +-
 .../GremlinServerSessionIntegrateTest.java      |  6 +-
 14 files changed, 247 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0c871715/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index d0aa8e8..9f45477 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 TinkerPop 3.1.5 (Release Date: NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+* Improved handling of `Cluster.close()` and `Client.close()` to prevent the methods from hanging.
 * Fixed output redirection and potential memory leak in `GremlinGroovyScriptEngine`.
 * Corrected naming of `g_withPath_V_asXaX_out_out_mapXa_name_it_nameX` and `g_withPath_V_asXaX_out_mapXa_nameX` in `MapTest`.
 * Improved session cleanup when a close is triggered by the client.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0c871715/docs/src/upgrade/release-3.1.x-incubating.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/upgrade/release-3.1.x-incubating.asciidoc b/docs/src/upgrade/release-3.1.x-incubating.asciidoc
index 400ea10..6cf9fb2 100644
--- a/docs/src/upgrade/release-3.1.x-incubating.asciidoc
+++ b/docs/src/upgrade/release-3.1.x-incubating.asciidoc
@@ -27,6 +27,23 @@ TinkerPop 3.1.5
 
 *Release Date: NOT OFFICIALLY RELEASED YET*
 
+Please see the link:https://github.com/apache/tinkerpop/blob/3.1.4/CHANGELOG.asciidoc#tinkerpop-315-release-date-XXXXXXXXXXXX[changelog] for a complete list of all the modifications that are part of this release.
+
+Upgrading for Users
+~~~~~~~~~~~~~~~~~~~
+
+Java Driver and close()
+^^^^^^^^^^^^^^^^^^^^^^^
+
+There were a few problems noted around the `close()` of `Cluster` and `Client` instances, including issues that
+presented as system hangs. These issues have been resolved, however, it is worth noting that an unchecked exception
+that was thrown under a certain situation has changed as part of the bug fixes. When submitting an in-session request
+on a `Client` that was closed (or closing) an `IllegalStateException` is thrown. This replaces older functionality
+that threw a `ConnectionException` and relied logic far deeper in the driver to produce that error and had the
+potential to open additional resources despite the intention of the user to "close".
+
+See: https://issues.apache.org/jira/browse/TINKERPOP-1467[TINKERPOP-1467]
+
 TinkerPop 3.1.4
 ---------------
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0c871715/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 40be11c..b3761b7 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -20,9 +20,6 @@ package org.apache.tinkerpop.gremlin.driver;
 
 import io.netty.channel.Channel;
 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslProvider;
-import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
 import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinRequestEncoder;
 import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinResponseDecoder;
@@ -42,7 +39,6 @@ import io.netty.handler.ssl.SslContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
@@ -156,7 +152,7 @@ public interface Channelizer extends ChannelHandler {
          */
         @Override
         public void close(final Channel channel) {
-            channel.writeAndFlush(new CloseWebSocketFrame());
+            if (channel.isOpen()) channel.writeAndFlush(new CloseWebSocketFrame());
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0c871715/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 4aca9ca..3a03141 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
@@ -245,6 +246,8 @@ public abstract class Client {
      * A low-level method that allows the submission of a manually constructed {@link RequestMessage}.
      */
     public CompletableFuture<ResultSet> submitAsync(final RequestMessage msg) {
+        if (isClosing()) throw new IllegalStateException("Client has been closed");
+
         if (!initialized)
             init();
 
@@ -269,6 +272,8 @@ public abstract class Client {
         }
     }
 
+    public abstract boolean isClosing();
+
     /**
      * Closes the client by making a synchronous call to {@link #closeAsync()}.
      */
@@ -290,11 +295,17 @@ public abstract class Client {
     public final static class ClusteredClient extends Client {
 
         private ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new ConcurrentHashMap<>();
+        private final AtomicReference<CompletableFuture<Void>> closing = new AtomicReference<>(null);
 
         ClusteredClient(final Cluster cluster) {
             super(cluster);
         }
 
+        @Override
+        public boolean isClosing() {
+            return closing.get() != null;
+        }
+
         /**
          * Submits a Gremlin script to the server and returns a {@link ResultSet} once the write of the request is
          * complete.
@@ -364,10 +375,14 @@ public abstract class Client {
          * Closes all the connection pools on all hosts.
          */
         @Override
-        public CompletableFuture<Void> closeAsync() {
+        public synchronized CompletableFuture<Void> closeAsync() {
+            if (closing.get() != null)
+                return closing.get();
+
             final CompletableFuture[] poolCloseFutures = new CompletableFuture[hostConnectionPools.size()];
             hostConnectionPools.values().stream().map(ConnectionPool::closeAsync).collect(Collectors.toList()).toArray(poolCloseFutures);
-            return CompletableFuture.allOf(poolCloseFutures);
+            closing.set(CompletableFuture.allOf(poolCloseFutures));
+            return closing.get();
         }
     }
 
@@ -448,11 +463,16 @@ public abstract class Client {
          * close on the {@code Client} that created it.
          */
         @Override
-        public CompletableFuture<Void> closeAsync() {
+        public synchronized CompletableFuture<Void> closeAsync() {
             close.complete(null);
             return close;
         }
 
+        @Override
+        public boolean isClosing() {
+            return close.isDone();
+        }
+
         /**
          * {@inheritDoc}
          */
@@ -483,6 +503,8 @@ public abstract class Client {
 
         private ConnectionPool connectionPool;
 
+        private final AtomicReference<CompletableFuture<Void>> closing = new AtomicReference<>(null);
+
         SessionedClient(final Cluster cluster, final String sessionId, final boolean manageTransactions) {
             super(cluster);
             this.sessionId = sessionId;
@@ -526,12 +548,22 @@ public abstract class Client {
             connectionPool = new ConnectionPool(host, this, Optional.of(1), Optional.of(1));
         }
 
+        @Override
+        public boolean isClosing() {
+            return closing.get() != null;
+        }
+
         /**
          * Close the bound {@link ConnectionPool}.
          */
         @Override
-        public CompletableFuture<Void> closeAsync() {
-            return connectionPool.closeAsync();
+        public synchronized CompletableFuture<Void> closeAsync() {
+            if (closing.get() != null)
+                return closing.get();
+
+            final CompletableFuture<Void> connectionPoolClose = connectionPool.closeAsync();
+            closing.set(connectionPoolClose);
+            return connectionPoolClose;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0c871715/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index 6a6a2e3..473991a 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -36,6 +36,7 @@ import javax.net.ssl.TrustManager;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
+import java.lang.ref.WeakReference;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -83,7 +84,9 @@ public final class Cluster {
      * submitted or can be directly initialized via {@link Client#init()}.
      */
     public <T extends Client> T connect() {
-        return (T) new Client.ClusteredClient(this);
+        final Client client = new Client.ClusteredClient(this);
+        manager.trackClient(client);
+        return (T) client;
     }
 
     /**
@@ -122,7 +125,9 @@ public final class Cluster {
     public <T extends Client> T connect(final String sessionId, final boolean manageTransactions) {
         if (null == sessionId || sessionId.isEmpty())
             throw new IllegalArgumentException("sessionId cannot be null or empty");
-        return (T) new Client.SessionedClient(this, sessionId, manageTransactions);
+        final Client client = new Client.SessionedClient(this, sessionId, manageTransactions);
+        manager.trackClient(client);
+        return (T) client;
     }
 
     @Override
@@ -684,6 +689,8 @@ public final class Cluster {
 
         private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
 
+        private final List<WeakReference<Client>> openedClients = new ArrayList<>();
+
         private Manager(final Builder builder) {
             this.loadBalancingStrategy = builder.loadBalancingStrategy;
             this.authProps = builder.authProps;
@@ -730,6 +737,10 @@ public final class Cluster {
             });
         }
 
+        void trackClient(final Client client) {
+            openedClients.add(new WeakReference<>(client));
+        }
+
         public Host add(final InetSocketAddress address) {
             final Host newHost = new Host(address, Cluster.this);
             final Host previous = hosts.putIfAbsent(address, newHost);
@@ -745,6 +756,13 @@ public final class Cluster {
             if (closeFuture.get() != null)
                 return closeFuture.get();
 
+            for (WeakReference<Client> openedClient : openedClients) {
+                final Client client = openedClient.get();
+                if (client != null && !client.isClosing()) {
+                    client.close();
+                }
+            }
+
             final CompletableFuture<Void> closeIt = new CompletableFuture<>();
             closeFuture.set(closeIt);
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0c871715/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 220ad42..766db2e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -35,6 +35,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -148,22 +150,26 @@ final class Connection {
         return pending;
     }
 
-    public CompletableFuture<Void> closeAsync() {
+    public synchronized CompletableFuture<Void> closeAsync() {
+        if (isClosed()) return closeFuture.get();
+
         final CompletableFuture<Void> future = new CompletableFuture<>();
-        if (!closeFuture.compareAndSet(null, future))
-            return closeFuture.get();
+        closeFuture.set(future);
 
         // make sure all requests in the queue are fully processed before killing.  if they are then shutdown
         // can be immediate.  if not this method will signal the readCompleted future defined in the write()
         // operation to check if it can close.  in this way the connection no longer receives writes, but
         // can continue to read. If a request never comes back the future won't get fulfilled and the connection
         // will maintain a "pending" request, that won't quite ever go away.  The build up of such a dead requests
-        // on a connection in the connection pool will force the pool to replace the connection for a fresh one
+        // on a connection in the connection pool will force the pool to replace the connection for a fresh one.
         if (pending.isEmpty()) {
             if (null == channel)
                 future.complete(null);
             else
                 shutdown(future);
+        } else {
+            // there may be some pending requests. schedule a job to wait for those to complete and then shutdown
+            new CheckForPending(future).runUntilDone(cluster.executor(), 1000, TimeUnit.MILLISECONDS);
         }
 
         return future;
@@ -256,7 +262,7 @@ final class Connection {
             shutdown(closeFuture.get());
     }
 
-    private void shutdown(final CompletableFuture<Void> future) {
+    private synchronized void shutdown(final CompletableFuture<Void> future) {
         // shutdown can be called directly from closeAsync() or after write() and therefore this method should only
         // be called once. once shutdown is initiated, it shouldn't be executed a second time or else it sends more
         // messages at the server and leads to ugly log messages over there.
@@ -286,6 +292,7 @@ final class Connection {
             }
 
             channelizer.close(channel);
+
             final ChannelPromise promise = channel.newPromise();
             promise.addListener(f -> {
                 if (f.cause() != null)
@@ -307,4 +314,44 @@ final class Connection {
     public String toString() {
         return connectionLabel;
     }
+
+    /**
+     * Self-cancelling tasks that periodically checks for the pending queue to clear before shutting down the
+     * {@code Connection}. Once it does that, it self cancels the scheduled job in the executor.
+     */
+    private final class CheckForPending implements Runnable {
+        private volatile ScheduledFuture<?> self;
+        private final CompletableFuture<Void> future;
+
+        CheckForPending(final CompletableFuture<Void> future) {
+            this.future = future;
+        }
+
+        @Override
+        public void run() {
+            logger.info("Checking for pending messages to complete before close on {}", this);
+            if (pending.isEmpty()) {
+                shutdown(future);
+                boolean interrupted = false;
+                try {
+                    while(null == self) {
+                        try {
+                            Thread.sleep(1);
+                        } catch (InterruptedException e) {
+                            interrupted = true;
+                        }
+                    }
+                    self.cancel(false);
+                } finally {
+                    if(interrupted) {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+            }
+        }
+
+        void runUntilDone(final ScheduledExecutorService executor, final long period, final TimeUnit unit) {
+            self = executor.scheduleAtFixedRate(this, period, period, unit);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0c871715/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index 9955e82..4691b1b 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -219,27 +219,26 @@ final class ConnectionPool {
     /**
      * Permanently kills the pool.
      */
-    public CompletableFuture<Void> closeAsync() {
-        logger.info("Signalled closing of connection pool on {} with core size of {}", host, minPoolSize);
+    public synchronized CompletableFuture<Void> closeAsync() {
+        if (closeFuture.get() != null) return closeFuture.get();
 
-        CompletableFuture<Void> future = closeFuture.get();
-        if (future != null)
-            return future;
+        logger.info("Signalled closing of connection pool on {} with core size of {}", host, minPoolSize);
 
         announceAllAvailableConnection();
-        future = CompletableFuture.allOf(killAvailableConnections());
-
-        return closeFuture.compareAndSet(null, future) ? future : closeFuture.get();
+        final CompletableFuture<Void> future = killAvailableConnections();
+        closeFuture.set(future);
+        return future;
     }
 
-    private CompletableFuture[] killAvailableConnections() {
+    private CompletableFuture<Void> killAvailableConnections() {
         final List<CompletableFuture<Void>> futures = new ArrayList<>(connections.size());
         for (Connection connection : connections) {
             final CompletableFuture<Void> future = connection.closeAsync();
-            future.thenRunAsync(open::decrementAndGet, cluster.executor());
+            future.thenRun(open::decrementAndGet);
             futures.add(future);
         }
-        return futures.toArray(new CompletableFuture[futures.size()]);
+
+        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
     }
 
     void replaceConnection(final Connection connection) {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0c871715/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
index 773322a..65eb662 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
@@ -29,11 +29,11 @@ import io.netty.util.Attribute;
 import io.netty.util.AttributeKey;
 import io.netty.util.ReferenceCountUtil;
 import org.apache.tinkerpop.gremlin.driver.ser.SerializationException;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
 import java.security.PrivilegedExceptionAction;
 import java.security.PrivilegedActionException;
 import java.util.HashMap;
@@ -83,11 +83,24 @@ final class Handler {
             if (response.getStatus().getCode() == ResponseStatusCode.AUTHENTICATE) {
                 final Attribute<SaslClient> saslClient = channelHandlerContext.attr(saslClientKey);
                 final Attribute<Subject> subject = channelHandlerContext.attr(subjectKey);
-                RequestMessage.Builder messageBuilder = RequestMessage.build(Tokens.OPS_AUTHENTICATION);
+                final RequestMessage.Builder messageBuilder = RequestMessage.build(Tokens.OPS_AUTHENTICATION);
                 // First time through we don't have a sasl client
                 if (saslClient.get() == null) {
                     subject.set(login());
-                    saslClient.set(saslClient(getHostName(channelHandlerContext)));
+                    try {
+                        saslClient.set(saslClient(getHostName(channelHandlerContext)));
+                    } catch (SaslException saslException) {
+                        // push the sasl error into a failure response from the server. this ensures that standard
+                        // processing for the ResultQueue is kept. without this SaslException trap and subsequent
+                        // conversion to an authentication failure, the close() of the connection might not
+                        // succeed as it will appear as though pending messages remain present in the queue on the
+                        // connection and the shutdown won't proceed
+                        final ResponseMessage clientSideError = ResponseMessage.build(response.getRequestId())
+                                .code(ResponseStatusCode.FORBIDDEN).statusMessage(saslException.getMessage()).create();
+                        channelHandlerContext.fireChannelRead(clientSideError);
+                        return;
+                    }
+
                     messageBuilder.addArg(Tokens.ARGS_SASL_MECHANISM, getMechanism());
                     messageBuilder.addArg(Tokens.ARGS_SASL, saslClient.get().hasInitialResponse() ?
                                                                 evaluateChallenge(subject, saslClient, NULL_CHALLENGE) : null);
@@ -214,12 +227,12 @@ final class Handler {
             // there are that many failures someone would take notice and hopefully stop the client.
             logger.error("Could not process the response", cause);
 
-            // the channel took an error because of something pretty bad so release all the completeable
-            // futures out there
-            pending.entrySet().stream().forEach(kv -> kv.getValue().markError(cause));
+            // the channel took an error because of something pretty bad so release all the futures out there
+            pending.values().forEach(val -> val.markError(cause));
+            pending.clear();
 
             // serialization exceptions should not close the channel - that's worth a retry
-            if (!ExceptionUtils.getThrowableList(cause).stream().anyMatch(t -> t instanceof SerializationException))
+            if (!IteratorUtils.anyMatch(ExceptionUtils.getThrowableList(cause).iterator(), t -> t instanceof SerializationException))
                 ctx.close();
         }
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0c871715/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
index 89a0225..e55456e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
@@ -76,6 +76,10 @@ final class ResultQueue {
         return this.size() == 0;
     }
 
+    public boolean isComplete() {
+        return readComplete.isDone();
+    }
+
     void drainTo(final Collection<Result> collection) {
         if (error.get() != null) throw new RuntimeException(error.get());
         resultLinkedBlockingQueue.drainTo(collection);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0c871715/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 922775e..e2a5668 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -95,9 +95,9 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
 
     @Override
     public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
-        logger.warn("Exception caught during WebSocket processing - closing connection", cause);
         if (!handshakeFuture.isDone()) handshakeFuture.setFailure(cause);
-        ctx.close();
+
+        // let the GremlinResponseHandler take care of exception logging, channel closing, and cleanup
         ctx.fireExceptionCaught(cause);
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0c871715/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 1a04b6b..bf66b0d 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -25,7 +25,6 @@ import org.apache.tinkerpop.gremlin.driver.Client;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
 import org.apache.tinkerpop.gremlin.driver.Result;
 import org.apache.tinkerpop.gremlin.driver.ResultSet;
-import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
 import org.apache.tinkerpop.gremlin.driver.ser.JsonBuilderGryoSerializer;
@@ -746,11 +745,13 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         client.close();
 
         try {
-            client.submit("x[0]+1");
+            client.submit("x[0]+1").all().get();
             fail("Should have thrown an exception because the connection is closed");
         } catch (Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
-            assertThat(root, instanceOf(ConnectionException.class));
+            assertThat(root, instanceOf(IllegalStateException.class));
+        } finally {
+            cluster.close();
         }
     }
 
@@ -1248,6 +1249,78 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         }
     }
 
+    @Test
+    public void shouldCloseAllClientsOnCloseOfCluster() throws Exception {
+        final Cluster cluster = Cluster.open();
+        final Client sessionlessOne = cluster.connect();
+        final Client session = cluster.connect("session");
+        final Client sessionlessTwo = cluster.connect();
+        final Client sessionlessThree = cluster.connect();
+        final Client sessionlessFour = cluster.connect();
+
+        assertEquals(2, sessionlessOne.submit("1+1").all().get().get(0).getInt());
+        assertEquals(2, session.submit("1+1").all().get().get(0).getInt());
+        assertEquals(2, sessionlessTwo.submit("1+1").all().get().get(0).getInt());
+        assertEquals(2, sessionlessThree.submit("1+1").all().get().get(0).getInt());
+        // dont' send anything on the 4th client
+
+        // close one of these Clients before the Cluster
+        sessionlessThree.close();
+        cluster.close();
+
+        try {
+            sessionlessOne.submit("1+1").all().get();
+            fail("Should have tossed an exception because cluster was closed");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root, instanceOf(IllegalStateException.class));
+            assertEquals("Client has been closed", root.getMessage());
+        }
+
+        try {
+            session.submit("1+1").all().get();
+            fail("Should have tossed an exception because cluster was closed");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root, instanceOf(IllegalStateException.class));
+            assertEquals("Client has been closed", root.getMessage());
+        }
+
+        try {
+            sessionlessTwo.submit("1+1").all().get();
+            fail("Should have tossed an exception because cluster was closed");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root, instanceOf(IllegalStateException.class));
+            assertEquals("Client has been closed", root.getMessage());
+        }
+
+        try {
+            sessionlessThree.submit("1+1").all().get();
+            fail("Should have tossed an exception because cluster was closed");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root, instanceOf(IllegalStateException.class));
+            assertEquals("Client has been closed", root.getMessage());
+        }
+
+        try {
+            sessionlessFour.submit("1+1").all().get();
+            fail("Should have tossed an exception because cluster was closed");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root, instanceOf(IllegalStateException.class));
+            assertEquals("Client has been closed", root.getMessage());
+        }
+
+        // allow call to close() even though closed through cluster
+        sessionlessOne.close();
+        session.close();
+        sessionlessTwo.close();
+
+        cluster.close();
+    }
+
     private void assertFutureTimeout(final CompletableFuture<List<Result>> futureFirst) {
         try
         {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0c871715/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
index 3e1b7e9..887d408 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
@@ -23,7 +23,6 @@ import org.apache.tinkerpop.gremlin.driver.Client;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
-import org.ietf.jgss.GSSException;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -128,7 +127,7 @@ public class GremlinServerAuthIntegrateTest extends AbstractGremlinServerIntegra
             fail("This should not succeed as the client did not provide credentials");
         } catch(Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
-            assertEquals(GSSException.class, root.getClass());
+            assertEquals(ResponseException.class, root.getClass());
         } finally {
             cluster.close();
         }
@@ -157,7 +156,7 @@ public class GremlinServerAuthIntegrateTest extends AbstractGremlinServerIntegra
         final Client client = cluster.connect();
 
         try {
-            client.submit("1+1").all();
+            client.submit("1+1").all().get();
         } catch(Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
             assertEquals(ResponseException.class, root.getClass());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0c871715/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthOldIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthOldIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthOldIntegrateTest.java
index f2e5622..2f332be 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthOldIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthOldIntegrateTest.java
@@ -131,7 +131,7 @@ public class GremlinServerAuthOldIntegrateTest extends AbstractGremlinServerInte
             fail("This should not succeed as the client did not provide credentials");
         } catch(Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
-            assertEquals(GSSException.class, root.getClass());
+            assertEquals(ResponseException.class, root.getClass());
 
             // removed this assert as the text of the message changes based on kerberos config - stupid kerberos
             // assertThat(root.getMessage(), startsWith("Invalid name provided"));
@@ -163,7 +163,7 @@ public class GremlinServerAuthOldIntegrateTest extends AbstractGremlinServerInte
         final Client client = cluster.connect();
 
         try {
-            client.submit("1+1").all();
+            client.submit("1+1").all().get();
         } catch(Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
             assertEquals(ResponseException.class, root.getClass());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0c871715/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
index 99b3a1b..3c1fef9 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
@@ -26,7 +26,6 @@ import org.apache.tinkerpop.gremlin.driver.Cluster;
 import org.apache.tinkerpop.gremlin.driver.Result;
 import org.apache.tinkerpop.gremlin.driver.ResultSet;
 import org.apache.tinkerpop.gremlin.driver.Tokens;
-import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
@@ -189,7 +188,7 @@ public class GremlinServerSessionIntegrateTest  extends AbstractGremlinServerInt
             fail("Session should be dead");
         } catch (Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
-            assertThat(root, instanceOf(ConnectionException.class));
+            assertThat(root, instanceOf(IllegalStateException.class));
         } finally {
             cluster.close();
         }
@@ -229,7 +228,8 @@ public class GremlinServerSessionIntegrateTest  extends AbstractGremlinServerInt
             cluster.close();
         }
 
-        assertEquals(1, recordingAppender.getMessages().stream()
+        // there will be on for the timeout and a second for closing the cluster
+        assertEquals(2, recordingAppender.getMessages().stream()
                 .filter(msg -> msg.equals("INFO - Session shouldHaveTheSessionTimeout closed\n")).count());
     }
 


[2/3] tinkerpop git commit: TINKERPOP-1460 Deprecate reconnectInitialDelay on Java driver

Posted by sp...@apache.org.
TINKERPOP-1460 Deprecate reconnectInitialDelay on Java driver

The deprecated setting is one of the less impactful ones and should be of little concern to users insofar as affecting driver performnance or usage. CTR


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/d1ebe5fc
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/d1ebe5fc
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/d1ebe5fc

Branch: refs/heads/TINKERPOP-1467-master
Commit: d1ebe5fcf935f4f7020a44e158a56b1591005913
Parents: 20c5f84
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Thu Sep 29 12:26:56 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Thu Sep 29 12:26:56 2016 -0400

----------------------------------------------------------------------
 CHANGELOG.asciidoc                                          | 1 +
 docs/src/upgrade/release-3.2.x-incubating.asciidoc          | 9 +++++++++
 .../java/org/apache/tinkerpop/gremlin/driver/Cluster.java   | 3 +++
 .../org/apache/tinkerpop/gremlin/driver/Connection.java     | 5 +++++
 .../main/java/org/apache/tinkerpop/gremlin/driver/Host.java | 2 +-
 5 files changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d1ebe5fc/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 47d4bbf..cced503 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -55,6 +55,7 @@ TinkerPop 3.2.3 (Release Date: NOT OFFICIALLY RELEASED YET)
 * Added "keep-alive" functionality to the Java driver, which will send a heartbeat to the server when normal request activity on a connection stops for a period of time.
 * Renamed the `empty.result.indicator` preference to `result.indicator.null` in Gremlin Console
 * If `result.indicator.null` is set to an empty string, then no "result line" is printed in Gremlin Console.
+* Deprecated `reconnectInitialDelay` on the Java driver.
 * VertexPrograms can now declare traverser requirements, e.g. to have access to the path when used with `.program()`.
 * New build options for `gremlin-python` where `-DglvPython` is no longer required.
 * Added missing `InetAddress` to GraphSON extension module.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d1ebe5fc/docs/src/upgrade/release-3.2.x-incubating.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/upgrade/release-3.2.x-incubating.asciidoc b/docs/src/upgrade/release-3.2.x-incubating.asciidoc
index a6be1c6..cc1cf2e 100644
--- a/docs/src/upgrade/release-3.2.x-incubating.asciidoc
+++ b/docs/src/upgrade/release-3.2.x-incubating.asciidoc
@@ -118,6 +118,15 @@ gremlin> g.V().hasLabel("software").count()
 TinkerPop 3.2.3 fixes this misbehavior and all `has()` method overloads behave like before, except that they no longer
 support no arguments.
 
+Deprecated reconnectInitialDelay
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The `reconnectInitialDelay` setting on the `Cluster` builder has been deprecated. It no longer serves any purpose.
+The value for the "initial delay" now comes from `reconnectInterval` (there are no longer two separate settings to
+control).
+
+See: link:https://issues.apache.org/jira/browse/TINKERPOP-1460[TINKERPOP-1460]
+
 Upgrading for Providers
 ~~~~~~~~~~~~~~~~~~~~~~~
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d1ebe5fc/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index f79e719..f420de0 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -732,7 +732,10 @@ public final class Cluster {
 
         /**
          * Time in milliseconds to wait before attempting to reconnect to a dead host after it has been marked dead.
+         *
+         * @deprecated As of release 3.2.3, the value of the initial delay is now the same as the {@link #reconnectInterval}.
          */
+        @Deprecated
         public Builder reconnectIntialDelay(final int initialDelay) {
             this.reconnectInitialDelay = initialDelay;
             return this;

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d1ebe5fc/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 1ef9b98..02ec0b3 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -63,6 +63,11 @@ final class Connection {
     public static final int MAX_WAIT_FOR_CONNECTION = 3000;
     public static final int MAX_WAIT_FOR_SESSION_CLOSE = 3000;
     public static final int MAX_CONTENT_LENGTH = 65536;
+
+    /**
+     * @deprecated As of release 3.2.3, replaced by {@link #RECONNECT_INTERVAL}.
+     */
+    @Deprecated
     public static final int RECONNECT_INITIAL_DELAY = 1000;
     public static final int RECONNECT_INTERVAL = 1000;
     public static final int RESULT_ITERATION_BATCH_SIZE = 64;

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d1ebe5fc/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java
index e21d9be..52f4d78 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java
@@ -76,7 +76,7 @@ public final class Host {
             retryThread = this.cluster.executor().scheduleAtFixedRate(() -> {
                     logger.debug("Trying to reconnect to dead host at {}", this);
                     if (reconnect.apply(this)) reconnected();
-                }, cluster.connectionPoolSettings().reconnectInitialDelay,
+                }, cluster.connectionPoolSettings().reconnectInterval,
                 cluster.connectionPoolSettings().reconnectInterval, TimeUnit.MILLISECONDS);
         }
     }


[3/3] tinkerpop git commit: Merge remote-tracking branch 'origin/TINKERPOP-1467' into TINKERPOP-1467-master

Posted by sp...@apache.org.
Merge remote-tracking branch 'origin/TINKERPOP-1467' into TINKERPOP-1467-master

Conflicts:
	gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
	gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
	gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
	gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/c5886689
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/c5886689
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/c5886689

Branch: refs/heads/TINKERPOP-1467-master
Commit: c58866892c13c778d07fd40c192ce917ad103217
Parents: d1ebe5f 0c87171
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Thu Sep 29 12:49:19 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Thu Sep 29 12:49:19 2016 -0400

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  1 +
 .../upgrade/release-3.1.x-incubating.asciidoc   | 17 +++++
 .../tinkerpop/gremlin/driver/Channelizer.java   |  2 +-
 .../apache/tinkerpop/gremlin/driver/Client.java | 42 +++++++++--
 .../tinkerpop/gremlin/driver/Cluster.java       | 24 +++++-
 .../tinkerpop/gremlin/driver/Connection.java    | 56 ++++++++++++--
 .../gremlin/driver/ConnectionPool.java          | 21 +++---
 .../tinkerpop/gremlin/driver/Handler.java       | 26 +++++--
 .../tinkerpop/gremlin/driver/ResultQueue.java   |  4 +
 .../driver/handler/WebSocketClientHandler.java  |  4 +-
 .../server/GremlinDriverIntegrateTest.java      | 79 +++++++++++++++++++-
 .../server/GremlinServerAuthIntegrateTest.java  |  5 +-
 .../GremlinServerAuthOldIntegrateTest.java      |  4 +-
 .../GremlinServerSessionIntegrateTest.java      |  6 +-
 14 files changed, 247 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c5886689/CHANGELOG.asciidoc
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c5886689/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c5886689/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
----------------------------------------------------------------------
diff --cc gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 7b9262e,3a03141..bd397a1
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@@ -350,11 -295,17 +355,17 @@@ public abstract class Client 
      public final static class ClusteredClient extends Client {
  
          private ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new ConcurrentHashMap<>();
+         private final AtomicReference<CompletableFuture<Void>> closing = new AtomicReference<>(null);
  
 -        ClusteredClient(final Cluster cluster) {
 -            super(cluster);
 +        ClusteredClient(final Cluster cluster, final Client.Settings settings) {
 +            super(cluster, settings);
          }
  
+         @Override
+         public boolean isClosing() {
+             return closing.get() != null;
+         }
+ 
          /**
           * Submits a Gremlin script to the server and returns a {@link ResultSet} once the write of the request is
           * complete.
@@@ -650,10 -503,12 +670,12 @@@
  
          private ConnectionPool connectionPool;
  
+         private final AtomicReference<CompletableFuture<Void>> closing = new AtomicReference<>(null);
+ 
 -        SessionedClient(final Cluster cluster, final String sessionId, final boolean manageTransactions) {
 -            super(cluster);
 -            this.sessionId = sessionId;
 -            this.manageTransactions = manageTransactions;
 +        SessionedClient(final Cluster cluster, final Client.Settings settings) {
 +            super(cluster, settings);
 +            this.sessionId = settings.getSession().get().sessionId;
 +            this.manageTransactions = settings.getSession().get().manageTransactions;
          }
  
          String getSessionId() {
@@@ -697,134 -557,13 +724,139 @@@
           * Close the bound {@link ConnectionPool}.
           */
          @Override
-         public CompletableFuture<Void> closeAsync() {
-             return connectionPool.closeAsync();
+         public synchronized CompletableFuture<Void> closeAsync() {
+             if (closing.get() != null)
+                 return closing.get();
+ 
+             final CompletableFuture<Void> connectionPoolClose = connectionPool.closeAsync();
+             closing.set(connectionPoolClose);
+             return connectionPoolClose;
          }
      }
 +
 +    /**
 +     * Settings given to {@link Cluster#connect(Settings)} that configures how a {@link Client} will behave.
 +     */
 +    public static class Settings {
 +        private final Optional<SessionSettings> session;
 +
 +        private Settings(final Builder builder) {
 +            this.session = builder.session;
 +        }
 +
 +        public static Builder build() {
 +            return new Builder();
 +        }
 +
 +        /**
 +         * Determines if the {@link Client} is to be constructed with a session. If the value is present, then a
 +         * session is expected.
 +         */
 +        public Optional<SessionSettings> getSession() {
 +            return session;
 +        }
 +
 +        public static class Builder {
 +            private Optional<SessionSettings> session = Optional.empty();
 +
 +            private Builder() {}
 +
 +            /**
 +             * Enables a session. By default this will create a random session name and configure transactions to be
 +             * unmanaged. This method will override settings provided by calls to the other overloads of
 +             * {@code useSession}.
 +             */
 +            public Builder useSession(final boolean enabled) {
 +                session = enabled ? Optional.of(SessionSettings.build().create()) : Optional.empty();
 +                return this;
 +            }
 +
 +            /**
 +             * Enables a session. By default this will create a session with the provided name and configure
 +             * transactions to be unmanaged. This method will override settings provided by calls to the other
 +             * overloads of {@code useSession}.
 +             */
 +            public Builder useSession(final String sessionId) {
 +                session = sessionId != null && !sessionId.isEmpty() ?
 +                        Optional.of(SessionSettings.build().sessionId(sessionId).create()) : Optional.empty();
 +                return this;
 +            }
 +
 +            /**
 +             * Enables a session. This method will override settings provided by calls to the other overloads of
 +             * {@code useSession}.
 +             */
 +            public Builder useSession(final SessionSettings settings) {
 +                session = Optional.ofNullable(settings);
 +                return this;
 +            }
 +
 +            public Settings create() {
 +                return new Settings(this);
 +            }
 +
 +        }
 +    }
 +
 +    /**
 +     * Settings for a {@link Client} that involve a session.
 +     */
 +    public static class SessionSettings {
 +        private final boolean manageTransactions;
 +        private final String sessionId;
 +
 +        private SessionSettings(final Builder builder) {
 +            manageTransactions = builder.manageTransactions;
 +            sessionId = builder.sessionId;
 +        }
 +
 +        /**
 +         * If enabled, transactions will be "managed" such that each request will represent a complete transaction.
 +         */
 +        public boolean manageTransactions() {
 +            return manageTransactions;
 +        }
 +
 +        /**
 +         * Provides the identifier of the session.
 +         */
 +        public String getSessionId() {
 +            return sessionId;
 +        }
 +
 +        public static SessionSettings.Builder build() {
 +            return new SessionSettings.Builder();
 +        }
 +
 +        public static class Builder {
 +            private boolean manageTransactions = false;
 +            private String sessionId = UUID.randomUUID().toString();
 +
 +            private Builder() {}
 +
 +            /**
 +             * If enabled, transactions will be "managed" such that each request will represent a complete transaction.
 +             * By default this value is {@code false}.
 +             */
 +            public Builder manageTransactions(final boolean manage) {
 +                manageTransactions = manage;
 +                return this;
 +            }
 +
 +            /**
 +             * Provides the identifier of the session. This value cannot be null or empty. By default it is set to
 +             * a random {@code UUID}.
 +             */
 +            public Builder sessionId(final String sessionId) {
 +                if (null == sessionId || sessionId.isEmpty())
 +                    throw new IllegalArgumentException("sessionId cannot be null or empty");
 +                this.sessionId = sessionId;
 +                return this;
 +            }
 +
 +            public SessionSettings create() {
 +                return new SessionSettings(this);
 +            }
 +        }
 +    }
  }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c5886689/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
----------------------------------------------------------------------
diff --cc gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index f420de0,473991a..f455029
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@@ -84,7 -84,9 +85,9 @@@ public final class Cluster 
       * submitted or can be directly initialized via {@link Client#init()}.
       */
      public <T extends Client> T connect() {
-         return (T) new Client.ClusteredClient(this, Client.Settings.build().create());
 -        final Client client = new Client.ClusteredClient(this);
++        final Client client =  new Client.ClusteredClient(this, Client.Settings.build().create());
+         manager.trackClient(client);
+         return (T) client;
      }
  
      /**
@@@ -121,19 -123,11 +124,21 @@@
       * @param manageTransactions enables auto-transactions when set to true
       */
      public <T extends Client> T connect(final String sessionId, final boolean manageTransactions) {
 -        if (null == sessionId || sessionId.isEmpty())
 -            throw new IllegalArgumentException("sessionId cannot be null or empty");
 -        final Client client = new Client.SessionedClient(this, sessionId, manageTransactions);
 +        final Client.SessionSettings sessionSettings = Client.SessionSettings.build()
 +                .manageTransactions(manageTransactions)
 +                .sessionId(sessionId).create();
 +        final Client.Settings settings = Client.Settings.build().useSession(sessionSettings).create();
 +        return connect(settings);
 +    }
 +
 +    /**
 +     * Creates a new {@link Client} based on the settings provided.
 +     */
 +    public <T extends Client> T connect(final Client.Settings settings) {
-         return settings.getSession().isPresent() ? (T) new Client.SessionedClient(this, settings) :
-                 (T) new Client.ClusteredClient(this, settings);
++        final Client client = settings.getSession().isPresent() ? new Client.SessionedClient(this, settings) :
++                new Client.ClusteredClient(this, settings);
+         manager.trackClient(client);
+         return (T) client;
      }
  
      @Override
@@@ -865,12 -687,10 +870,14 @@@
  
          private final ScheduledExecutorService executor;
  
 +        private final int nioPoolSize;
 +        private final int workerPoolSize;
 +        private final int port;
 +
          private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
  
+         private final List<WeakReference<Client>> openedClients = new ArrayList<>();
+ 
          private Manager(final Builder builder) {
              this.loadBalancingStrategy = builder.loadBalancingStrategy;
              this.authProps = builder.authProps;

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c5886689/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --cc gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 02ec0b3,766db2e..d197858
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@@ -158,15 -150,12 +159,16 @@@ final class Connection 
          return pending;
      }
  
-     public CompletableFuture<Void> closeAsync() {
+     public synchronized CompletableFuture<Void> closeAsync() {
+         if (isClosed()) return closeFuture.get();
+ 
          final CompletableFuture<Void> future = new CompletableFuture<>();
-         if (!closeFuture.compareAndSet(null, future))
-             return closeFuture.get();
+         closeFuture.set(future);
  
 +        // stop any pings being sent at the server for keep-alive
 +        final ScheduledFuture keepAlive = keepAliveFuture.get();
 +        if (keepAlive != null) keepAlive.cancel(true);
 +
          // make sure all requests in the queue are fully processed before killing.  if they are then shutdown
          // can be immediate.  if not this method will signal the readCompleted future defined in the write()
          // operation to check if it can close.  in this way the connection no longer receives writes, but

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c5886689/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c5886689/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c5886689/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c5886689/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c5886689/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --cc gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 96cde54,bf66b0d..d8aff4a
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@@ -26,9 -25,7 +26,8 @@@ import org.apache.tinkerpop.gremlin.dri
  import org.apache.tinkerpop.gremlin.driver.Cluster;
  import org.apache.tinkerpop.gremlin.driver.Result;
  import org.apache.tinkerpop.gremlin.driver.ResultSet;
- import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
  import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 +import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
  import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
  import org.apache.tinkerpop.gremlin.driver.ser.JsonBuilderGryoSerializer;
  import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0;