You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2019/09/30 13:03:22 UTC

[tinkerpop] branch driver-35 updated (6cdfe22 -> d33989b)

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a change to branch driver-35
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git.


 discard 6cdfe22  Change connection management to single request per channel
     add efde61e  Resolved some dependency problems from #1199 and #1200 after merge from tp33 CTR
     add 67d55c9  Merge branch 'tp34'
     new d33989b  Change connection management to single request per channel

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (6cdfe22)
            \
             N -- N -- N   refs/heads/driver-35 (d33989b)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 hadoop-gremlin/pom.xml | 2 +-
 spark-gremlin/pom.xml  | 4 ++++
 2 files changed, 5 insertions(+), 1 deletion(-)


[tinkerpop] 01/01: Change connection management to single request per channel

Posted by sp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch driver-35
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit d33989b7fe32f874e4699e515891674b79f3ed89
Author: Divij Vaidya <di...@gmail.com>
AuthorDate: Tue Jun 18 00:33:40 2019 -0700

    Change connection management to single request per channel
---
 docs/src/reference/gremlin-variants.asciidoc       |  16 +-
 .../tinkerpop/gremlin/driver/Channelizer.java      | 134 ++++--
 .../apache/tinkerpop/gremlin/driver/Client.java    | 173 +++++--
 .../apache/tinkerpop/gremlin/driver/Cluster.java   |  99 ++--
 .../tinkerpop/gremlin/driver/Connection.java       | 411 ++--------------
 .../tinkerpop/gremlin/driver/ConnectionPool.java   | 530 +++------------------
 .../gremlin/driver/ConnectionPoolImpl.java         | 246 ++++++++++
 .../apache/tinkerpop/gremlin/driver/Handler.java   | 111 +++--
 .../org/apache/tinkerpop/gremlin/driver/Host.java  |   4 +
 .../tinkerpop/gremlin/driver/ResultQueue.java      |  13 +
 .../apache/tinkerpop/gremlin/driver/ResultSet.java |   7 +
 .../apache/tinkerpop/gremlin/driver/Settings.java  |  55 ++-
 .../gremlin/driver/SingleRequestConnection.java    | 219 +++++++++
 .../gremlin/driver/TinkerpopFixedChannelPool.java  | 508 ++++++++++++++++++++
 .../driver/handler/WebSocketClientHandler.java     |   7 +-
 .../driver/handler/WebSocketIdleEventHandler.java  |  58 +++
 .../driver/handler/WebsocketCloseHandler.java      |  54 +++
 .../gremlin/driver/simple/AbstractClient.java      |   1 +
 .../gremlin/driver/ClusterBuilderTest.java         |  13 +-
 .../driver/ClientConnectionIntegrateTest.java      | 109 -----
 ...ClientSingleRequestConnectionIntegrateTest.java | 452 ++++++++++++++++++
 .../gremlin/server/GremlinDriverIntegrateTest.java | 148 +++---
 .../server/GremlinServerAuthIntegrateTest.java     |  64 ++-
 .../server/GremlinServerAuthKrb5IntegrateTest.java |  77 ++-
 .../gremlin/server/GremlinServerIntegrateTest.java |  24 +-
 .../server/GremlinServerSessionIntegrateTest.java  |  60 ++-
 26 files changed, 2309 insertions(+), 1284 deletions(-)

diff --git a/docs/src/reference/gremlin-variants.asciidoc b/docs/src/reference/gremlin-variants.asciidoc
index 08382fc..5aea700 100644
--- a/docs/src/reference/gremlin-variants.asciidoc
+++ b/docs/src/reference/gremlin-variants.asciidoc
@@ -243,14 +243,11 @@ The following table describes the various configuration options for the Gremlin
 |connectionPool.keyStorePassword |The password of the `keyStore` if it is password-protected. |_none_
 |connectionPool.keyStoreType |`JKS` (Java 8 default) or `PKCS12` (Java 9+ default)|_none_
 |connectionPool.maxContentLength |The maximum length in bytes that a message can be sent to the server. This number can be no greater than the setting of the same name in the server configuration. |65536
-|connectionPool.maxInProcessPerConnection |The maximum number of in-flight requests that can occur on a connection. |4
-|connectionPool.maxSimultaneousUsagePerConnection |The maximum number of times that a connection can be borrowed from the pool simultaneously. |16
-|connectionPool.maxSize |The maximum size of a connection pool for a host. |8
+|connectionPool.maxInProcessPerConnection |The maximum number of in-flight requests that can occur on a connection. This setting is deprecated as of 3.4.3 and it's functionality has been rolled into maxSize. For backward compatibility it is still used to approximate the amount of parallelism required. In future versions, the approximation logic will be removed and dependency on this parameter will be completely eliminated. To disable the dependency on this parameter right now, set to 0.|4
+|connectionPool.maxSimultaneousUsagePerConnection |The maximum number of times that a connection can be borrowed from the pool simultaneously. This setting is deprecated as of 3.4.3 and it's functionality has been rolled into maxSize. For backward compatibility it is still used to approximate the amount of parallelism required. In future versions, the approximation logic will be removed and dependency on this parameter will be completely eliminated. To disable the dependency on this para [...]
+|connectionPool.maxSize |The maximum number of parallel requests that can be made to the server. This is the only configuration required to control the number of concurrent requests that can be made to the server. |8
 |connectionPool.maxWaitForConnection |The amount of time in milliseconds to wait for a new connection before timing out. |3000
 |connectionPool.maxWaitForSessionClose |The amount of time in milliseconds to wait for a session to close before timing out (does not apply to sessionless connections). |3000
-|connectionPool.minInProcessPerConnection |The minimum number of in-flight requests that can occur on a connection. |1
-|connectionPool.minSimultaneousUsagePerConnection |The maximum number of times that a connection can be borrowed from the pool simultaneously. |8
-|connectionPool.minSize |The minimum size of a connection pool for a host. |2
 |connectionPool.reconnectInterval |The amount of time in milliseconds to wait before trying to reconnect to a dead host. |1000
 |connectionPool.resultIterationBatchSize |The override value for the size of the result batches to be returned from the server. |64
 |connectionPool.sslCipherSuites |The list of JSSE ciphers to support for SSL connections. If specified, only the ciphers that are listed and supported will be enabled. If not specified, the JVM default is used.  |_none_
@@ -274,6 +271,11 @@ The following table describes the various configuration options for the Gremlin
 
 Please see the link:http://tinkerpop.apache.org/javadocs/x.y.z/core/org/apache/tinkerpop/gremlin/driver/Cluster.Builder.html[Cluster.Builder javadoc] to get more information on these settings.
 
+==== Choosing a value for ConnectionPool.maxSize
+
+`ConnectionPool.maxSize` represents the maximum number of concurrent requests that the client can make to the server. Each request is made using its own websocket connection, hence, this parameter also controls the maximum number of WebSocket connections that can be concurrently opened to the server.
+
+While choosing a value for this parameter, determine how many requests you anticipate to run in parallel from your client. Beyond this number you would start getting timeout exceptions and should handle those timeouts at the application layer.
 === Serialization
 
 Remote systems like Gremlin Server and Remote Gremlin Providers respond to requests made in a particular serialization
@@ -1102,7 +1104,7 @@ g.V().Out().Map<int>(Lambda.Groovy("it.get().value('name').length()")).Sum<int>(
 g.V().Out().Map<int>(Lambda.Python("lambda x: len(x.get().value('name'))")).Sum<int>().ToList(); <2>
 ----
 
-<1> `Lambda.Groovy()` can be used to create a Groovy lambda. 
+<1> `Lambda.Groovy()` can be used to create a Groovy lambda.
 <2> `Lambda.Python()` can be used to create a Python lambda.
 
 The `ILambda` interface returned by these two methods inherits interfaces like `IFunction` and `IPredicate` that mirror
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 4b39efc..723c1c5 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -19,29 +19,33 @@
 package org.apache.tinkerpop.gremlin.driver;
 
 import io.netty.channel.Channel;
-import io.netty.handler.codec.http.EmptyHttpHeaders;
-import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
-import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
-import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinRequestEncoder;
-import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinResponseDecoder;
-import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
-import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder;
-import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.EmptyHttpHeaders;
 import io.netty.handler.codec.http.HttpClientCodec;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
 import io.netty.handler.codec.http.websocketx.WebSocketVersion;
 import io.netty.handler.ssl.SslContext;
+import io.netty.handler.timeout.IdleStateHandler;
+
+import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
+import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinRequestEncoder;
+import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinResponseDecoder;
+import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
+import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder;
+import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder;
+import org.apache.tinkerpop.gremlin.driver.handler.WebSocketIdleEventHandler;
+import org.apache.tinkerpop.gremlin.driver.handler.WebsocketCloseHandler;
 
 import java.util.Optional;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.lang.Math.toIntExact;
 
 /**
  * Client-side channel initializer interface.  It is responsible for constructing the Netty {@code ChannelPipeline}
@@ -53,9 +57,15 @@ public interface Channelizer extends ChannelHandler {
 
     /**
      * Initializes the {@code Channelizer}. Called just after construction.
+     * @param connection
+     *
+     * @deprecated As of release 3.4.3, replaced by {@link #init(ConnectionPool)}.
      */
+    @Deprecated
     public void init(final Connection connection);
 
+    public default void init(final ConnectionPool connectionPool) { throw new UnsupportedOperationException(); }
+
     /**
      * Called on {@link Connection#close()} to perform an {@code Channelizer} specific functions.  Note that the
      * {@link Connection} already calls {@code Channel.close()} so there is no need to call that method here.
@@ -65,7 +75,7 @@ public interface Channelizer extends ChannelHandler {
     public void close(final Channel channel);
 
     /**
-     * Create a message for the driver to use as a "keep-alive" for the connection. This method will only be used if
+     * Create a message for the driver to use as a "keep-alive" for the connectionPool. This method will only be used if
      * {@link #supportsKeepAlive()} is {@code true}.
      */
     public default Object createKeepAliveMessage() {
@@ -73,7 +83,7 @@ public interface Channelizer extends ChannelHandler {
     }
 
     /**
-     * Determines if the channelizer supports a method for keeping the connection to the server alive.
+     * Determines if the channelizer supports a method for keeping the connectionPool to the server alive.
      */
     public default boolean supportsKeepAlive() {
         return false;
@@ -82,17 +92,23 @@ public interface Channelizer extends ChannelHandler {
     /**
      * Called after the channel connects. The {@code Channelizer} may need to perform some functions, such as a
      * handshake.
+     *
+     * @deprecated As of release 3.4.3, replaced by {@link #connected(Channel)}.
      */
+    @Deprecated
     public default void connected() {
     }
 
+    public default void connected(final Channel ch) {
+    }
+
     /**
      * Base implementation of the client side {@link Channelizer}.
      */
     abstract class AbstractChannelizer extends ChannelInitializer<SocketChannel> implements Channelizer {
-        protected Connection connection;
+        protected ConnectionPool connectionPool;
         protected Cluster cluster;
-        private ConcurrentMap<UUID, ResultQueue> pending;
+        protected Handler.GremlinResponseHandler gremlinResponseHandler;
 
         protected static final String PIPELINE_GREMLIN_SASL_HANDLER = "gremlin-sasl-handler";
         protected static final String PIPELINE_GREMLIN_HANDLER = "gremlin-handler";
@@ -114,32 +130,39 @@ public interface Channelizer extends ChannelHandler {
 
         @Override
         public void init(final Connection connection) {
-            this.connection = connection;
-            this.cluster = connection.getCluster();
-            this.pending = connection.getPending();
+            // do nothing
         }
 
         @Override
-        protected void initChannel(final SocketChannel socketChannel) throws Exception {
+        public void init(final ConnectionPool connPool) {
+            this.connectionPool = connPool;
+            this.cluster = connPool.getCluster();
+            this.gremlinResponseHandler = new Handler.GremlinResponseHandler();
+        }
+
+        @Override
+        protected void initChannel(final SocketChannel socketChannel) {
             final ChannelPipeline pipeline = socketChannel.pipeline();
-            final Optional<SslContext> sslCtx;
+            final Optional<SslContext> sslCtxOpt;
             if (supportsSsl()) {
                 try {
-                    sslCtx = Optional.of(cluster.createSSLContext());
+                    sslCtxOpt = Optional.of(cluster.createSSLContext());
                 } catch (Exception ex) {
                     throw new RuntimeException(ex);
                 }
             } else {
-                sslCtx = Optional.empty();
+                sslCtxOpt = Optional.empty();
             }
 
-            if (sslCtx.isPresent()) {
-                pipeline.addLast(sslCtx.get().newHandler(socketChannel.alloc(), connection.getUri().getHost(), connection.getUri().getPort()));
-            }
+            sslCtxOpt.ifPresent((sslCtx) -> {
+                pipeline.addLast(sslCtx.newHandler(socketChannel.alloc(),
+                                                   connectionPool.getHost().getHostUri().getHost(),
+                                                   connectionPool.getHost().getHostUri().getPort()));
+            });
 
             configure(pipeline);
             pipeline.addLast(PIPELINE_GREMLIN_SASL_HANDLER, new Handler.GremlinSaslAuthenticationHandler(cluster.authProperties()));
-            pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new Handler.GremlinResponseHandler(pending));
+            pipeline.addLast(PIPELINE_GREMLIN_HANDLER, gremlinResponseHandler);
         }
     }
 
@@ -147,16 +170,22 @@ public interface Channelizer extends ChannelHandler {
      * WebSocket {@link Channelizer} implementation.
      */
     public final class WebSocketChannelizer extends AbstractChannelizer {
-        private WebSocketClientHandler handler;
 
         private WebSocketGremlinRequestEncoder webSocketGremlinRequestEncoder;
         private WebSocketGremlinResponseDecoder webSocketGremlinResponseDecoder;
+        private WebSocketIdleEventHandler webSocketIdleEventHandler;
 
         @Override
-        public void init(final Connection connection) {
-            super.init(connection);
+        public void init(Connection connection) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void init(final ConnectionPool connpool) {
+            super.init(connpool);
             webSocketGremlinRequestEncoder = new WebSocketGremlinRequestEncoder(true, cluster.getSerializer());
             webSocketGremlinResponseDecoder = new WebSocketGremlinResponseDecoder(cluster.getSerializer());
+            webSocketIdleEventHandler = new WebSocketIdleEventHandler(connpool.getActiveChannels());
         }
 
         /**
@@ -170,27 +199,14 @@ public interface Channelizer extends ChannelHandler {
         }
 
         @Override
-        public Object createKeepAliveMessage() {
-            return new PingWebSocketFrame();
-        }
-
-        /**
-         * Sends a {@code CloseWebSocketFrame} to the server for the specified channel.
-         */
-        @Override
-        public void close(final Channel channel) {
-            if (channel.isOpen()) channel.writeAndFlush(new CloseWebSocketFrame());
-        }
-
-        @Override
         public boolean supportsSsl() {
-            final String scheme = connection.getUri().getScheme();
+            final String scheme = connectionPool.getHost().getHostUri().getScheme();
             return "wss".equalsIgnoreCase(scheme);
         }
 
         @Override
         public void configure(final ChannelPipeline pipeline) {
-            final String scheme = connection.getUri().getScheme();
+            final String scheme = connectionPool.getHost().getHostUri().getScheme();
             if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme))
                 throw new IllegalStateException("Unsupported scheme (only ws: or wss: supported): " + scheme);
 
@@ -198,27 +214,41 @@ public interface Channelizer extends ChannelHandler {
                 throw new IllegalStateException("To use wss scheme ensure that enableSsl is set to true in configuration");
 
             final int maxContentLength = cluster.connectionPoolSettings().maxContentLength;
-            handler = new WebSocketClientHandler(
+            // TODO: Replace WebSocketClientHandler with Netty's WebSocketClientProtocolHandler
+            final WebSocketClientHandler handler = new WebSocketClientHandler(
                     WebSocketClientHandshakerFactory.newHandshaker(
-                            connection.getUri(), WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, maxContentLength));
+                            connectionPool.getHost().getHostUri(), WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, maxContentLength));
 
+            int keepAliveInterval = toIntExact(TimeUnit.SECONDS.convert(cluster.connectionPoolSettings().keepAliveInterval, TimeUnit.MILLISECONDS));
             pipeline.addLast("http-codec", new HttpClientCodec());
             pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength));
-            pipeline.addLast("ws-handler", handler);
+            pipeline.addLast("netty-idle-state-Handler", new IdleStateHandler(0, keepAliveInterval, 0));
+            pipeline.addLast("ws-idle-handler", webSocketIdleEventHandler);
+            pipeline.addLast("ws-client-handler", handler);
+            pipeline.addLast("ws-close-handler", new WebsocketCloseHandler());
             pipeline.addLast("gremlin-encoder", webSocketGremlinRequestEncoder);
             pipeline.addLast("gremlin-decoder", webSocketGremlinResponseDecoder);
         }
 
+
         @Override
-        public void connected() {
+        public void connected(final Channel ch) {
             try {
                 // block for a few seconds - if the handshake takes longer than there's gotta be issues with that
                 // server. more than likely, SSL is enabled on the server, but the client forgot to enable it or
                 // perhaps the server is not configured for websockets.
-                handler.handshakeFuture().get(15000, TimeUnit.MILLISECONDS);
-            } catch (Exception ex) {
-                throw new RuntimeException(new ConnectionException(connection.getUri(),
-                        "Could not complete websocket handshake - ensure that client protocol matches server", ex));
+                ((WebSocketClientHandler)(ch.pipeline().get("ws-client-handler"))).handshakeFuture().addListener( f -> {
+                    if (!f.isSuccess()) {
+                        throw new ConnectionException(connectionPool.getHost().getHostUri(),
+                                                                           "Could not complete websocket handshake - ensure that client protocol matches server", f.cause());
+                    }
+                }).get(1500, TimeUnit.MILLISECONDS);
+            } catch (ExecutionException ex) {
+                throw new RuntimeException(ex.getCause());
+            } catch (InterruptedException | TimeoutException ex) {
+                // catching the InterruptedException will reset the interrupted flag. This is intentional.
+                throw new RuntimeException(new ConnectionException(connectionPool.getHost().getHostUri(),
+                                                                   "Timed out while performing websocket handshake - ensure that client protocol matches server", ex.getCause()));
             }
         }
     }
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 0143753..2b4d0ed 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -18,7 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
-import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
+
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -29,6 +29,7 @@ import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.ConnectException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -83,7 +84,7 @@ public abstract class Client {
     /**
      * Chooses a {@link Connection} to write the message to.
      */
-    protected abstract Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException;
+    protected abstract CompletableFuture<Connection> chooseConnectionAsync(final RequestMessage msg);
 
     /**
      * Asynchronous close of the {@code Client}.
@@ -359,24 +360,29 @@ public abstract class Client {
             init();
 
         final CompletableFuture<ResultSet> future = new CompletableFuture<>();
-        Connection connection = null;
-        try {
-            // the connection is returned to the pool once the response has been completed...see Connection.write()
-            // the connection may be returned to the pool with the host being marked as "unavailable"
-            connection = chooseConnection(msg);
-            connection.write(msg, future);
-            return future;
-        } catch (TimeoutException toe) {
-            // there was a timeout borrowing a connection
-            throw new RuntimeException(toe);
-        } catch (ConnectionException ce) {
-            throw new RuntimeException(ce);
-        } catch (Exception ex) {
-            throw new RuntimeException(ex);
-        } finally {
-            if (logger.isDebugEnabled())
-                logger.debug("Submitted {} to - {}", msg, null == connection ? "connection not initialized" : connection.toString());
-        }
+        // the connectionPool is returned to the pool once the response has been completed...see Connection.write()
+        chooseConnectionAsync(msg)
+                .whenCompleteAsync((conn, t) -> {
+                    if (t != null) {
+                        if (t.getCause() != null && t.getCause() instanceof IllegalStateException) {
+                            logger.error("SingleRequestConnection {} failed.", msg, t.getCause());
+                        } else if (t.getCause() != null &&
+                                t.getCause().getCause() != null &&
+                                t.getCause().getCause() instanceof TimeoutException) {
+                            future.completeExceptionally(t.getCause().getCause());
+                        } else if (t.getCause() != null &&
+                                t.getCause().getCause() != null &&
+                                t.getCause().getCause() instanceof ConnectException) {
+                            future.completeExceptionally(t.getCause().getCause());
+                        } else {
+                            future.completeExceptionally(t);
+                        }
+                    } else {
+                        conn.write(msg, future);
+                        logger.debug("Submitted {} to - {}", msg, conn);
+                    }
+                }, this.cluster.executor());
+        return future;
     }
 
     public abstract boolean isClosing();
@@ -478,25 +484,45 @@ public abstract class Client {
          * from that host's connection pool.
          */
         @Override
-        protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException {
-            final Iterator<Host> possibleHosts;
-            if (msg.optionalArgs(Tokens.ARGS_HOST).isPresent()) {
-                // TODO: not sure what should be done if unavailable - select new host and re-submit traversal?
-                final Host host = (Host) msg.getArgs().get(Tokens.ARGS_HOST);
-                msg.getArgs().remove(Tokens.ARGS_HOST);
-                possibleHosts = IteratorUtils.of(host);
-            } else {
-                possibleHosts = this.cluster.loadBalancingStrategy().select(msg);
-            }
+        protected CompletableFuture<Connection> chooseConnectionAsync(final RequestMessage msg) {
+            return CompletableFuture.supplyAsync(() -> {
+                final Iterator<Host> possibleHosts;
+                if (msg.optionalArgs(Tokens.ARGS_HOST).isPresent()) {
+                    // TODO: not sure what should be done if unavailable - select new host and re-submit traversal?
+                    final Host host = (Host) msg.getArgs().get(Tokens.ARGS_HOST);
+                    msg.getArgs().remove(Tokens.ARGS_HOST);
+                    possibleHosts = IteratorUtils.of(host);
+                } else {
+                    possibleHosts = this.cluster.loadBalancingStrategy().select(msg);
+                }
 
-            // you can get no possible hosts in more than a few situations. perhaps the servers are just all down.
-            // or perhaps the client is not configured properly (disables ssl when ssl is enabled on the server).
-            if (!possibleHosts.hasNext())
-                throw new TimeoutException("Timed out while waiting for an available host - check the client configuration and connectivity to the server if this message persists");
+                int numTimeoutException = 0;
+
+                // you can get no possible hosts in more than a few situations. perhaps the servers are just all down.
+                // or perhaps the client is not configured properly (disables ssl when ssl is enabled on the server).
+                while (possibleHosts.hasNext()) {
+                    final Host bestHost = possibleHosts.next();
+                    final ConnectionPool pool = hostConnectionPools.get(bestHost);
+                    try {
+                        return pool.prepareConnection();
+                    } catch (TimeoutException ex) {
+                        logger.info("Timeout while borrowing connection from the {} for request {}. " +
+                                            "Consider increasing the max number of connections in the configuration."
+                                , bestHost, msg, ex);
+                        numTimeoutException++;
+                    } catch (ConnectException ex) {
+                        logger.warn("Unable to connect to the host {}. Check if the host is reachable.", bestHost, ex);
+                    } catch (Exception ex) {
+                        logger.warn("Error while borrowing connection from the {} for request {}", bestHost, msg, ex);
+                    }
+                }
 
-            final Host bestHost = possibleHosts.next();
-            final ConnectionPool pool = hostConnectionPools.get(bestHost);
-            return pool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
+                if (numTimeoutException > 0) {
+                    throw new RuntimeException(new TimeoutException("Timed out while waiting for an available host for the request " + msg + ". Try after some time."));
+                } else {
+                    throw new RuntimeException(new ConnectException("Unable to find a valid connection for the request " + msg));
+                }
+            }, this.cluster.executor());
         }
 
         /**
@@ -506,14 +532,14 @@ public abstract class Client {
         protected void initializeImplementation() {
             cluster.allHosts().forEach(host -> {
                 try {
-                    // hosts that don't initialize connection pools will come up as a dead host
-                    hostConnectionPools.put(host, new ConnectionPool(host, this));
+                    final ConnectionPool connectionPool = ConnectionPoolImpl.create(host, cluster);
+                    hostConnectionPools.put(host, connectionPool);
 
                     // added a new host to the cluster so let the load-balancer know
                     this.cluster.loadBalancingStrategy().onNew(host);
                 } catch (Exception ex) {
-                    // catch connection errors and prevent them from failing the creation
-                    logger.warn("Could not initialize connection pool for {} - will try later", host);
+                    // catch connectionPool errors and prevent them from failing the creation
+                    logger.warn("Could not initialize connection pool for {} - will try later", host, ex);
                 }
             });
         }
@@ -623,9 +649,9 @@ public abstract class Client {
          * Delegates to the underlying {@link org.apache.tinkerpop.gremlin.driver.Client.ClusteredClient}.
          */
         @Override
-        protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException {
+        protected CompletableFuture<Connection> chooseConnectionAsync(final RequestMessage msg) {
             if (close.isDone()) throw new IllegalStateException("Client is closed");
-            return client.chooseConnection(msg);
+            return client.chooseConnectionAsync(msg);
         }
 
         /**
@@ -689,10 +715,27 @@ public abstract class Client {
 
         /**
          * Since the session is bound to a single host, simply borrow a connection from that pool.
+         *
+         * @throws RuntimeException wrapping the actual cause
          */
         @Override
-        protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException {
-            return connectionPool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
+        protected CompletableFuture<Connection> chooseConnectionAsync(final RequestMessage msg) {
+            return CompletableFuture.supplyAsync(() -> {
+                try {
+                    return connectionPool.prepareConnection();
+                } catch (TimeoutException e) {
+                    logger.info("Timeout while borrowing connection from the {} for request {}. " +
+                                        "Consider increasing the max number of connections in the configuration."
+                            , this.connectionPool.getHost(), msg, e);
+
+                    throw new RuntimeException(e);
+                } catch (ConnectException e) {
+                    logger.warn("Unable to connect to the host {}. Check if the host is reachable.",
+                                this.connectionPool.getHost(), e);
+
+                    throw new RuntimeException(e);
+                }
+            }, this.cluster.executor());
         }
 
         /**
@@ -702,11 +745,11 @@ public abstract class Client {
         protected void initializeImplementation() {
             // chooses an available host at random
             final List<Host> hosts = cluster.allHosts()
-                    .stream().filter(Host::isAvailable).collect(Collectors.toList());
+                                            .stream().filter(Host::isAvailable).collect(Collectors.toList());
             if (hosts.isEmpty()) throw new IllegalStateException("No available host in the cluster");
             Collections.shuffle(hosts);
             final Host host = hosts.get(0);
-            connectionPool = new ConnectionPool(host, this, Optional.of(1), Optional.of(1));
+            connectionPool = ConnectionPoolImpl.create(host, cluster);
         }
 
         @Override
@@ -724,10 +767,38 @@ public abstract class Client {
 
             // the connection pool may not have been initialized if requests weren't sent across it. in those cases
             // we just need to return a pre-completed future
-            final CompletableFuture<Void> connectionPoolClose = null == connectionPool ?
-                    CompletableFuture.completedFuture(null) : connectionPool.closeAsync();
-            closing.set(connectionPoolClose);
-            return connectionPoolClose;
+            if (connectionPool == null) {
+                closing.set(CompletableFuture.completedFuture(null));
+                return closing.get();
+            }
+
+            final boolean forceClose = this.settings.getSession().get().isForceClosed();
+            final RequestMessage closeMessage = buildMessage(RequestMessage.build(Tokens.OPS_CLOSE)
+                                                                           .addArg(Tokens.ARGS_FORCE, forceClose)).create();
+
+            final CompletableFuture<Void> sessionClose = submitAsync(closeMessage).thenCompose(s -> connectionPool.closeAsync());
+            closing.set(sessionClose);
+
+            return sessionClose;
+        }
+
+        @Override
+        public void close() {
+            try {
+                closeAsync().get(cluster.connectionPoolSettings().maxWaitForSessionClose, TimeUnit.MILLISECONDS);
+            } catch (TimeoutException e) {
+                final String msg = String.format(
+                        "Timeout while trying to close connection on %s - force closing - server will close session on shutdown or expiration.",
+                        this.getSessionId());
+                logger.warn(msg, e);
+            } catch (Exception ex) {
+                final String msg = String.format(
+                        "Encountered an error trying to close connection on %s - force closing - server will close session on shutdown or expiration.",
+                        this.getSessionId());
+                logger.warn(msg, ex);
+            } finally {
+                connectionPool.closeAsync().join();
+            }
         }
     }
 
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index 70a53ad..8fcf838 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -18,19 +18,19 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
+import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.SslProvider;
 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import org.apache.commons.configuration2.Configuration;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -257,7 +257,7 @@ public final class Cluster {
     }
 
     public CompletableFuture<Void> closeAsync() {
-        return manager.close();
+        return manager.close().thenRun(() -> logger.info("Closed Cluster for hosts [{}]", this));
     }
 
     /**
@@ -278,7 +278,7 @@ public final class Cluster {
 
     /**
      * Gets the list of hosts that the {@code Cluster} was able to connect to.  A {@link Host} is assumed unavailable
-     * until a connection to it is proven to be present.  This will not happen until the {@link Client} submits
+     * until a connectionPool to it is proven to be present.  This will not happen until the {@link Client} submits
      * requests that succeed in reaching a server at the {@link Host} or {@link Client#init()} is called which
      * initializes the {@link ConnectionPool} for the {@link Client} itself.  The number of available hosts returned
      * from this method will change as different servers come on and offline.
@@ -540,18 +540,18 @@ public final class Cluster {
         private MessageSerializer serializer = Serializers.GRAPHBINARY_V1D0.simpleInstance();
         private int nioPoolSize = Runtime.getRuntime().availableProcessors();
         private int workerPoolSize = Runtime.getRuntime().availableProcessors() * 2;
-        private int minConnectionPoolSize = ConnectionPool.MIN_POOL_SIZE;
-        private int maxConnectionPoolSize = ConnectionPool.MAX_POOL_SIZE;
-        private int minSimultaneousUsagePerConnection = ConnectionPool.MIN_SIMULTANEOUS_USAGE_PER_CONNECTION;
-        private int maxSimultaneousUsagePerConnection = ConnectionPool.MAX_SIMULTANEOUS_USAGE_PER_CONNECTION;
-        private int maxInProcessPerConnection = Connection.MAX_IN_PROCESS;
-        private int minInProcessPerConnection = Connection.MIN_IN_PROCESS;
-        private int maxWaitForConnection = Connection.MAX_WAIT_FOR_CONNECTION;
-        private int maxWaitForSessionClose = Connection.MAX_WAIT_FOR_SESSION_CLOSE;
-        private int maxContentLength = Connection.MAX_CONTENT_LENGTH;
-        private int reconnectInterval = Connection.RECONNECT_INTERVAL;
-        private int resultIterationBatchSize = Connection.RESULT_ITERATION_BATCH_SIZE;
-        private long keepAliveInterval = Connection.KEEP_ALIVE_INTERVAL;
+        private int minConnectionPoolSize = ConnectionPool.DEFAULT_MIN_POOL_SIZE;
+        private int maxConnectionPoolSize = ConnectionPool.DEFAULT_MAX_POOL_SIZE;
+        private int minSimultaneousUsagePerConnection = ConnectionPool.DEFAULT_MIN_SIMULTANEOUS_USAGE_PER_CONNECTION;
+        private int maxSimultaneousUsagePerConnection = ConnectionPool.DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION;
+        private int maxInProcessPerConnection = Connection.DEFAULT_MAX_IN_PROCESS;
+        private int minInProcessPerConnection = Connection.DEFAULT_MIN_IN_PROCESS;
+        private int maxWaitForConnection = Connection.DEFAULT_MAX_WAIT_FOR_CONNECTION;
+        private int maxWaitForSessionClose = Connection.DEFAULT_MAX_WAIT_FOR_SESSION_CLOSE;
+        private int maxContentLength = Connection.DEFAULT_MAX_CONTENT_LENGTH;
+        private int reconnectInterval = Connection.DEFAULT_RECONNECT_INTERVAL;
+        private int resultIterationBatchSize = Connection.DEFAULT_RESULT_ITERATION_BATCH_SIZE;
+        private long keepAliveInterval = Connection.DEFAULT_KEEP_ALIVE_INTERVAL;
         private String channelizer = Channelizer.WebSocketChannelizer.class.getName();
         private boolean enableSsl = false;
         private String keyStore = null;
@@ -729,7 +729,11 @@ public final class Cluster {
         /**
          * The minimum number of in-flight requests that can occur on a {@link Connection} before it is considered
          * for closing on return to the {@link ConnectionPool}.
+         *
+         * @deprecated As of release 3.4.3, not replaced, this parameter is ignored.
+         * @see <a href="https://issues.apache.org/jira/browse/TINKERPOP-2205">TINKERPOP-2205</a>
          */
+        @Deprecated
         public Builder minInProcessPerConnection(final int minInProcessPerConnection) {
             this.minInProcessPerConnection = minInProcessPerConnection;
             return this;
@@ -742,7 +746,17 @@ public final class Cluster {
          * the total number of requests on a {@link Connection}.  In other words, a {@link Connection} might
          * be borrowed once to have multiple requests executed against it.  This number controls the maximum
          * number of requests whereas {@link #maxInProcessPerConnection} controls the times borrowed.
+         *
+         * @deprecated As of release 3.4.3, replaced by {@link #maxConnectionPoolSize}. For backward
+         * compatibility it is still used to approximate the amount of parallelism required. In future versions, the
+         * approximation logic will be removed and dependency on this parameter will be completely eliminated.
+         * To disable the dependency on this parameter right now, explicitly set the value of
+         * {@link #maxInProcessPerConnection} and {@link #maxSimultaneousUsagePerConnection} to zero.
+         *
+         * @see ConnectionPoolImpl#calculateMaxPoolSize(Settings.ConnectionPoolSettings) for approximation logic.
+         * @see <a href="https://issues.apache.org/jira/browse/TINKERPOP-2205">TINKERPOP-2205</a>
          */
+        @Deprecated
         public Builder maxInProcessPerConnection(final int maxInProcessPerConnection) {
             this.maxInProcessPerConnection = maxInProcessPerConnection;
             return this;
@@ -754,7 +768,17 @@ public final class Cluster {
          * {@link Connection} may queue requests too quickly, rather than wait for an available {@link Connection}
          * or create a fresh one.  If set too small, the {@link Connection} will show as busy very quickly thus
          * forcing waits for available {@link Connection} instances in the pool when there is more capacity available.
+         *
+         * @deprecated As of release 3.4.3, replaced by {@link #maxConnectionPoolSize}. For backward
+         * compatibility it is still used to approximate the amount of parallelism required. In future versions, the
+         * approximation logic will be removed and dependency on this parameter will be completely eliminated.
+         * To disable the dependency on this parameter right now, explicitly set the value of
+         * {@link #maxInProcessPerConnection} and {@link #maxSimultaneousUsagePerConnection} to zero.
+         *
+         * @see ConnectionPoolImpl#calculateMaxPoolSize(Settings.ConnectionPoolSettings) for approximation logic.
+         * @see <a href="https://issues.apache.org/jira/browse/TINKERPOP-2205">TINKERPOP-2205</a>
          */
+        @Deprecated
         public Builder maxSimultaneousUsagePerConnection(final int maxSimultaneousUsagePerConnection) {
             this.maxSimultaneousUsagePerConnection = maxSimultaneousUsagePerConnection;
             return this;
@@ -767,7 +791,11 @@ public final class Cluster {
          * too large and {@link Connection} that isn't busy will continue to consume resources when it is not being
          * used.  Set too small and {@link Connection} instances will be destroyed when the driver might still be
          * busy.
+         *
+         * @deprecated As of release 3.4.3, not replaced, this parameter is ignored.
+         * @see <a href="https://issues.apache.org/jira/browse/TINKERPOP-2205">TINKERPOP-2205</a>
          */
+        @Deprecated
         public Builder minSimultaneousUsagePerConnection(final int minSimultaneousUsagePerConnection) {
             this.minSimultaneousUsagePerConnection = minSimultaneousUsagePerConnection;
             return this;
@@ -784,7 +812,11 @@ public final class Cluster {
         /**
          * The minimum size of the {@link ConnectionPool}.  When the {@link Client} is started, {@link Connection}
          * objects will be initially constructed to this size.
+         *
+         * @deprecated As of release 3.4.3, not replaced, this parameter is ignored.
+         * @see <a href="https://issues.apache.org/jira/browse/TINKERPOP-2205">TINKERPOP-2205</a>
          */
+        @Deprecated
         public Builder minConnectionPoolSize(final int minSize) {
             this.minConnectionPoolSize = minSize;
             return this;
@@ -950,6 +982,7 @@ public final class Cluster {
 
         public Factory(final int nioPoolSize) {
             final BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("gremlin-driver-loop-%d").build();
+            // TODO: Enable epoll if available.
             group = new NioEventLoopGroup(nioPoolSize, threadFactory);
         }
 
@@ -1037,33 +1070,15 @@ public final class Cluster {
         }
 
         private void validateBuilder(final Builder builder) {
-            if (builder.minInProcessPerConnection < 0)
-                throw new IllegalArgumentException("minInProcessPerConnection must be greater than or equal to zero");
-
-            if (builder.maxInProcessPerConnection < 1)
-                throw new IllegalArgumentException("maxInProcessPerConnection must be greater than zero");
-
-            if (builder.minInProcessPerConnection > builder.maxInProcessPerConnection)
-                throw new IllegalArgumentException("maxInProcessPerConnection cannot be less than minInProcessPerConnection");
-
-            if (builder.minSimultaneousUsagePerConnection < 0)
-                throw new IllegalArgumentException("minSimultaneousUsagePerConnection must be greater than or equal to zero");
+            if (builder.maxInProcessPerConnection < 0)
+                throw new IllegalArgumentException("maxInProcessPerConnection must be greater than equal to zero");
 
-            if (builder.maxSimultaneousUsagePerConnection < 1)
-                throw new IllegalArgumentException("maxSimultaneousUsagePerConnection must be greater than zero");
-
-            if (builder.minSimultaneousUsagePerConnection > builder.maxSimultaneousUsagePerConnection)
-                throw new IllegalArgumentException("maxSimultaneousUsagePerConnection cannot be less than minSimultaneousUsagePerConnection");
-
-            if (builder.minConnectionPoolSize < 0)
-                throw new IllegalArgumentException("minConnectionPoolSize must be greater than or equal to zero");
+            if (builder.maxSimultaneousUsagePerConnection < 0)
+                throw new IllegalArgumentException("maxSimultaneousUsagePerConnection must be greater than equal to zero");
 
             if (builder.maxConnectionPoolSize < 1)
                 throw new IllegalArgumentException("maxConnectionPoolSize must be greater than zero");
 
-            if (builder.minConnectionPoolSize > builder.maxConnectionPoolSize)
-                throw new IllegalArgumentException("maxConnectionPoolSize cannot be less than minConnectionPoolSize");
-
             if (builder.maxWaitForConnection < 1)
                 throw new IllegalArgumentException("maxWaitForConnection must be greater than zero");
 
@@ -1133,7 +1148,7 @@ public final class Cluster {
                 closeIt.complete(null);
             });
 
-            // Prevent the executor from accepting new tasks while still allowing enqueued tasks to complete
+            // Prevent the executor from accepting new tasks while still allowing enqueued tasks to complete	            // executor may be required for proper closing. after completion of close, close the executor
             executor.shutdown();
 
             return closeIt;
@@ -1145,7 +1160,7 @@ public final class Cluster {
 
         @Override
         public String toString() {
-            return String.join(", ", contactPoints.stream().map(InetSocketAddress::toString).collect(Collectors.<String>toList()));
+            return contactPoints.stream().map(InetSocketAddress::toString).collect(Collectors.joining(","));
         }
     }
 }
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index e700d49..f02a501 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -18,394 +18,69 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
-import io.netty.handler.codec.CodecException;
-import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
-import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
-import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelPromise;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.URI;
-import java.util.UUID;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * A single connection to a Gremlin Server instance.
+ * Connection is a misnomer here and has been kept for historical purpose. This entity represents the lifetime of a
+ * single Gremlin interaction to the server. Each connection uses a persistent WebSocket {@link Channel}
+ * to send/receive data from the server. The associated {@link Channel} is released when the request associated
+ * with this connection has been completed and all the results have been read.
+ *
+ * The management of a Connection is done using a {@link ConnectionPool}.
  *
- * @author Stephen Mallette (http://stephen.genoprime.com)
+ * @see ConnectionPool
  */
-final class Connection {
-    private static final Logger logger = LoggerFactory.getLogger(Connection.class);
-
-    private final Channel channel;
-    private final URI uri;
-    private final ConcurrentMap<UUID, ResultQueue> pending = new ConcurrentHashMap<>();
-    private final Cluster cluster;
-    private final Client client;
-    private final ConnectionPool pool;
-    private final long keepAliveInterval;
-
-    public static final int MAX_IN_PROCESS = 4;
-    public static final int MIN_IN_PROCESS = 1;
-    public static final int MAX_WAIT_FOR_CONNECTION = 3000;
-    public static final int MAX_WAIT_FOR_SESSION_CLOSE = 3000;
-    public static final int MAX_CONTENT_LENGTH = 65536;
-
-    public static final int RECONNECT_INTERVAL = 1000;
-    public static final int RESULT_ITERATION_BATCH_SIZE = 64;
-    public static final long KEEP_ALIVE_INTERVAL = 180000;
-
+public interface Connection {
     /**
-     * When a {@code Connection} is borrowed from the pool, this number is incremented to indicate the number of
-     * times it has been taken and is decremented when it is returned.  This number is one indication as to how
-     * busy a particular {@code Connection} is.
+     * @deprecated As of release 3.5.0, replaced by {@link ConnectionPool#DEFAULT_MAX_POOL_SIZE}. For backward
+     * compatibility it is still used to approximate the amount of parallelism required. In future versions, the
+     * approximation logic will be removed and dependency on this parameter will be completely eliminated.
+     * To disable the dependency on this parameter right now, explicitly set the value of
+     * {@link Settings.ConnectionPoolSettings#maxInProcessPerConnection} and {@link Settings.ConnectionPoolSettings#maxSimultaneousUsagePerConnection}
+     * to 0.
+     *
+     * @see ConnectionPoolImpl#calculateMaxPoolSize(Settings.ConnectionPoolSettings) for approximation
+     * logic.
+     * @see <a href="https://issues.apache.org/jira/browse/TINKERPOP-2205">TINKERPOP-2205</a>
      */
-    public final AtomicInteger borrowed = new AtomicInteger(0);
-    private final AtomicReference<Class<Channelizer>> channelizerClass = new AtomicReference<>(null);
-
-    private final int maxInProcess;
-
-    private final String connectionLabel;
-
-    private final Channelizer channelizer;
-
-    private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
-    private final AtomicBoolean shutdownInitiated = new AtomicBoolean(false);
-    private final AtomicReference<ScheduledFuture> keepAliveFuture = new AtomicReference<>();
-
-    public Connection(final URI uri, final ConnectionPool pool, final int maxInProcess) throws ConnectionException {
-        this.uri = uri;
-        this.cluster = pool.getCluster();
-        this.client = pool.getClient();
-        this.pool = pool;
-        this.maxInProcess = maxInProcess;
-        this.keepAliveInterval = pool.settings().keepAliveInterval;
-
-        connectionLabel = String.format("Connection{host=%s}", pool.host);
-
-        if (cluster.isClosing()) throw new IllegalStateException("Cannot open a connection with the cluster after close() is called");
-
-        final Bootstrap b = this.cluster.getFactory().createBootstrap();
-        try {
-            if (channelizerClass.get() == null) {
-                channelizerClass.compareAndSet(null, (Class<Channelizer>) Class.forName(cluster.connectionPoolSettings().channelizer));
-            }
-
-            channelizer = channelizerClass.get().newInstance();
-            channelizer.init(this);
-            b.channel(NioSocketChannel.class).handler(channelizer);
-
-            channel = b.connect(uri.getHost(), uri.getPort()).sync().channel();
-            channelizer.connected();
-
-            logger.info("Created new connection for {}", uri);
-        } catch (Exception ie) {
-            logger.debug("Error opening connection on {}", uri);
-            throw new ConnectionException(uri, "Could not open connection", ie);
-        }
-    }
+    @Deprecated
+    int DEFAULT_MAX_IN_PROCESS = 4;
 
     /**
-     * A connection can only have so many things in process happening on it at once, where "in process" refers to
-     * the maximum number of in-process requests less the number of pending responses.
+     * @deprecated As of release 3.5.0, not replaced, this setting is ignored.
+     * @see <a href="https://issues.apache.org/jira/browse/TINKERPOP-2205">TINKERPOP-2205</a>
      */
-    public int availableInProcess() {
-        // no need for a negative available amount - not sure that the pending size can ever exceed maximum, but
-        // better to avoid the negatives that would ensue if it did
-        return Math.max(0, maxInProcess - pending.size());
-    }
+    @Deprecated
+    int DEFAULT_MIN_IN_PROCESS = 1;
+    int DEFAULT_MAX_WAIT_FOR_CONNECTION = 3000;
+    int DEFAULT_MAX_WAIT_FOR_SESSION_CLOSE = 3000;
+    int DEFAULT_MAX_CONTENT_LENGTH = 65536;
+    int DEFAULT_RECONNECT_INTERVAL = 1000;
+    int DEFAULT_RESULT_ITERATION_BATCH_SIZE = 64;
+    long DEFAULT_KEEP_ALIVE_INTERVAL = 1800000;
 
     /**
-     * Consider a connection as dead if the underlying channel is not connected.
+     * Write a Gremlin request to the server.
      *
-     * Note: A dead connection does not necessarily imply that the server is unavailable. Additional checks
-     * should be performed to mark the server host as unavailable.
+     * @param requestMessage Gremlin {@link RequestMessage} that is being sent to the server.
+     * @param resultQueueFuture Future that will contain the {@link ResultSet} which would be used to
+     *                          stream the result to the {@link RequestMessage}
+     * @return ChannelPromise Promise which represents a successful send (I/O event) of request to the server.
      */
-    public boolean isDead() {
-        return (channel !=null && !channel.isActive());
-    }
-
-    boolean isClosing() {
-        return closeFuture.get() != null;
-    }
-
-    URI getUri() {
-        return uri;
-    }
-
-    Cluster getCluster() {
-        return cluster;
-    }
-
-    Client getClient() {
-        return client;
-    }
-
-    ConcurrentMap<UUID, ResultQueue> getPending() {
-        return pending;
-    }
-
-    public synchronized CompletableFuture<Void> closeAsync() {
-        if (isClosing()) return closeFuture.get();
-
-        final CompletableFuture<Void> future = new CompletableFuture<>();
-        closeFuture.set(future);
-
-        // stop any pings being sent at the server for keep-alive
-        final ScheduledFuture keepAlive = keepAliveFuture.get();
-        if (keepAlive != null) keepAlive.cancel(true);
-
-        // make sure all requests in the queue are fully processed before killing.  if they are then shutdown
-        // can be immediate.  if not this method will signal the readCompleted future defined in the write()
-        // operation to check if it can close.  in this way the connection no longer receives writes, but
-        // can continue to read. If a request never comes back the future won't get fulfilled and the connection
-        // will maintain a "pending" request, that won't quite ever go away.  The build up of such a dead requests
-        // on a connection in the connection pool will force the pool to replace the connection for a fresh one.
-        if (isOkToClose()) {
-            if (null == channel)
-                future.complete(null);
-            else
-                shutdown(future);
-        } else {
-            // there may be some pending requests. schedule a job to wait for those to complete and then shutdown
-            new CheckForPending(future).runUntilDone(cluster.executor(), 1000, TimeUnit.MILLISECONDS);
-        }
-
-        return future;
-    }
-
-    public void close() {
-        try {
-            closeAsync().get();
-        } catch (Exception ex) {
-            throw new RuntimeException(ex);
-        }
-    }
-
-    public ChannelPromise write(final RequestMessage requestMessage, final CompletableFuture<ResultSet> future) {
-        // once there is a completed write, then create a traverser for the result set and complete
-        // the promise so that the client knows that that it can start checking for results.
-        final Connection thisConnection = this;
-
-        final ChannelPromise requestPromise = channel.newPromise()
-                .addListener(f -> {
-                    if (!f.isSuccess()) {
-                        if (logger.isDebugEnabled())
-                            logger.debug(String.format("Write on connection %s failed", thisConnection.getConnectionInfo()), f.cause());
-
-                        handleConnectionCleanupOnError(thisConnection, f.cause());
-
-                        cluster.executor().submit(() -> future.completeExceptionally(f.cause()));
-                    } else {
-                        final LinkedBlockingQueue<Result> resultLinkedBlockingQueue = new LinkedBlockingQueue<>();
-                        final CompletableFuture<Void> readCompleted = new CompletableFuture<>();
-
-                        // the callback for when the read was successful, meaning that ResultQueue.markComplete()
-                        // was called
-                        readCompleted.thenAcceptAsync(v -> {
-                            thisConnection.returnToPool();
-                            tryShutdown();
-                        }, cluster.executor());
-
-                        // the callback for when the read failed. a failed read means the request went to the server
-                        // and came back with a server-side error of some sort.  it means the server is responsive
-                        // so this isn't going to be like a potentially dead host situation which is handled above on a failed
-                        // write operation.
-                        readCompleted.exceptionally(t -> {
-
-                            handleConnectionCleanupOnError(thisConnection, t);
-
-                            // close was signaled in closeAsync() but there were pending messages at that time. attempt
-                            // the shutdown if the returned result cleared up the last pending message
-                            tryShutdown();
-
-                            return null;
-                        });
-
-                        final ResultQueue handler = new ResultQueue(resultLinkedBlockingQueue, readCompleted);
-                        pending.put(requestMessage.getRequestId(), handler);
-                        cluster.executor().submit(() -> future.complete(
-                                new ResultSet(handler, cluster.executor(), readCompleted, requestMessage, pool.host)));
-                    }
-                });
-        channel.writeAndFlush(requestMessage, requestPromise);
-
-        // try to keep the connection alive if the channel allows such things - websockets will
-        if (channelizer.supportsKeepAlive() && keepAliveInterval > 0) {
-
-            final ScheduledFuture oldKeepAliveFuture = keepAliveFuture.getAndSet(cluster.executor().scheduleAtFixedRate(() -> {
-                logger.debug("Request sent to server to keep {} alive", thisConnection);
-                try {
-                    channel.writeAndFlush(channelizer.createKeepAliveMessage());
-                } catch (Exception ex) {
-                    // will just log this for now - a future real request can be responsible for the failure that
-                    // marks the host as dead. this also may not mean the host is actually dead. more robust handling
-                    // is in play for real requests, not this simple ping
-                    logger.warn(String.format("Keep-alive did not succeed on %s", thisConnection), ex);
-                }
-            }, keepAliveInterval, keepAliveInterval, TimeUnit.MILLISECONDS));
-
-            // try to cancel the old future if it's still un-executed - no need to ping since a new write has come
-            // through on the connection
-            if (oldKeepAliveFuture != null) oldKeepAliveFuture.cancel(true);
-        }
-
-        return requestPromise;
-    }
-
-    public void returnToPool() {
-        try {
-            if (pool != null) pool.returnConnection(this);
-        } catch (ConnectionException ce) {
-            if (logger.isDebugEnabled())
-                logger.debug("Returned {} connection to {} but an error occurred - {}", this.getConnectionInfo(), pool, ce.getMessage());
-        }
-    }
-
-    /*
-     * In the event of an IOException (typically means that the Connection might have been closed from the server side
-     * - this is typical in situations like when a request is sent that exceeds maxContentLength and the server closes
-     * the channel on its side) or other exceptions that indicate a non-recoverable state for the Connection object
-     * (a netty CorruptedFrameException is a good example of that), the Connection cannot simply be returned to the
-     * pool as future uses will end with refusal from the server and make it appear as a dead host as the write will
-     * not succeed. Instead, the Connection needs to be replaced in these scenarios which destroys the dead channel
-     * on the client and allows a new one to be reconstructed.
-     */
-    private void handleConnectionCleanupOnError(final Connection thisConnection, final Throwable t) {
-        if (thisConnection.isDead() || t instanceof IOException || t instanceof CodecException) {
-            if (pool != null) pool.replaceConnection(thisConnection);
-        } else {
-            thisConnection.returnToPool();
-        }
-    }
-
-    private boolean isOkToClose() {
-        return pending.isEmpty() || (channel !=null && !channel.isOpen()) || !pool.host.isAvailable();
-    }
+    ChannelPromise write(RequestMessage requestMessage, CompletableFuture<ResultSet> resultQueueFuture);
 
     /**
-     * Close was signaled in closeAsync() but there were pending messages at that time. This method attempts the
-     * shutdown if the returned result cleared up the last pending message.
+     * @return Channel The underlying Netty {@link Channel} which is used by this connection.
      */
-    private void tryShutdown() {
-        if (isClosing() && isOkToClose())
-            shutdown(closeFuture.get());
-    }
-
-    private synchronized void shutdown(final CompletableFuture<Void> future) {
-        // shutdown can be called directly from closeAsync() or after write() and therefore this method should only
-        // be called once. once shutdown is initiated, it shouldn't be executed a second time or else it sends more
-        // messages at the server and leads to ugly log messages over there.
-        if (shutdownInitiated.compareAndSet(false, true)) {
-            final String connectionInfo = this.getConnectionInfo();
-
-            // maybe this should be delegated back to the Client implementation??? kinda weird to instanceof here.....
-            if (client instanceof Client.SessionedClient) {
-                final boolean forceClose = client.getSettings().getSession().get().isForceClosed();
-                final RequestMessage closeMessage = client.buildMessage(
-                        RequestMessage.build(Tokens.OPS_CLOSE).addArg(Tokens.ARGS_FORCE, forceClose)).create();
-
-                final CompletableFuture<ResultSet> closed = new CompletableFuture<>();
-                write(closeMessage, closed);
-
-                try {
-                    // make sure we get a response here to validate that things closed as expected.  on error, we'll let
-                    // the server try to clean up on its own.  the primary error here should probably be related to
-                    // protocol issues which should not be something a user has to fuss with.
-                    closed.join().all().get(cluster.connectionPoolSettings().maxWaitForSessionClose, TimeUnit.MILLISECONDS);
-                } catch (TimeoutException ex) {
-                    final String msg = String.format(
-                            "Timeout while trying to close connection on %s - force closing - server will close session on shutdown or expiration.",
-                            ((Client.SessionedClient) client).getSessionId());
-                    logger.warn(msg, ex);
-                } catch (Exception ex) {
-                    final String msg = String.format(
-                            "Encountered an error trying to close connection on %s - force closing - server will close session on shutdown or expiration.",
-                            ((Client.SessionedClient) client).getSessionId());
-                    logger.warn(msg, ex);
-                }
-            }
-
-            channelizer.close(channel);
-
-            final ChannelPromise promise = channel.newPromise();
-            promise.addListener(f -> {
-                if (f.cause() != null) {
-                    future.completeExceptionally(f.cause());
-                } else {
-                    if (logger.isDebugEnabled())
-                        logger.debug("{} destroyed successfully.", connectionInfo);
-
-                    future.complete(null);
-                }
-            });
-
-            channel.close(promise);
-        }
-    }
-
-    public String getConnectionInfo() {
-        return String.format("Connection{host=%s, isDead=%s, borrowed=%s, pending=%s}",
-                pool.host, isDead(), borrowed, pending.size());
-    }
-
-    @Override
-    public String toString() {
-        return connectionLabel;
-    }
+    Channel getChannel();
 
     /**
-     * Self-cancelling tasks that periodically checks for the pending queue to clear before shutting down the
-     * {@code Connection}. Once it does that, it self cancels the scheduled job in the executor.
+     * @return Host The {@link Host} this connection is connected to.
      */
-    private final class CheckForPending implements Runnable {
-        private volatile ScheduledFuture<?> self;
-        private final CompletableFuture<Void> future;
-
-        CheckForPending(final CompletableFuture<Void> future) {
-            this.future = future;
-        }
-
-        @Override
-        public void run() {
-            logger.info("Checking for pending messages to complete before close on {}", this);
-
-            if (isOkToClose()) {
-                shutdown(future);
-                boolean interrupted = false;
-                try {
-                    while(null == self) {
-                        try {
-                            Thread.sleep(1);
-                        } catch (InterruptedException e) {
-                            interrupted = true;
-                        }
-                    }
-                    self.cancel(false);
-                } finally {
-                    if(interrupted) {
-                        Thread.currentThread().interrupt();
-                    }
-                }
-            }
-        }
-
-        void runUntilDone(final ScheduledExecutorService executor, final long period, final TimeUnit unit) {
-            self = executor.scheduleAtFixedRate(this, period, period, unit);
-        }
-    }
-}
+    Host getHost();
+}
\ No newline at end of file
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index 332731e..9d8804f 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -18,476 +18,92 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
-import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
-import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
-import org.apache.tinkerpop.gremlin.util.TimeUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import io.netty.channel.group.ChannelGroup;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
+import java.net.ConnectException;
+import java.nio.channels.Channel;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * @author Stephen Mallette (http://stephen.genoprime.com)
+ * Connection pool is responsible for maintaining re-usable resources required to send
+ * requests to a specific server. It is also the gatekeeper for the number of simultaneous requests
+ * to the server.
+ *
+ * More specifically, it associates a Netty {@link Channel} with a {@link Connection}.
+ *
+ * A typical workflow for the lifetime of a Gremlin request would be as follows:
+ * 1. Connection pool is set up attached to a host on initialization.
+ * 2. Client borrows a connection from the connection pool.
+ * 3. Connection pool chooses a healthy & available channel (or creates one if necessary) and
+ *    creates a {@link Connection} associated to it.
+ * 4. The lifecycle of the {@link Connection} ends when the results have been read and it is returned back to
+ *    the pool.
+ * 5. Connection pool reclaims the channel which is now free to be used with another request.
  */
-final class ConnectionPool {
-    private static final Logger logger = LoggerFactory.getLogger(ConnectionPool.class);
-
-    public static final int MIN_POOL_SIZE = 2;
-    public static final int MAX_POOL_SIZE = 8;
-    public static final int MIN_SIMULTANEOUS_USAGE_PER_CONNECTION = 8;
-    public static final int MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = 16;
-
-    public final Host host;
-    private final Cluster cluster;
-    private final Client client;
-    private final List<Connection> connections;
-    private final AtomicInteger open;
-    private final Set<Connection> bin = new CopyOnWriteArraySet<>();
-    private final int minPoolSize;
-    private final int maxPoolSize;
-    private final int minSimultaneousUsagePerConnection;
-    private final int maxSimultaneousUsagePerConnection;
-    private final int minInProcess;
-    private final String poolLabel;
-
-    private final AtomicInteger scheduledForCreation = new AtomicInteger();
-
-    private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
-
-    private volatile int waiter = 0;
-    private final Lock waitLock = new ReentrantLock(true);
-    private final Condition hasAvailableConnection = waitLock.newCondition();
-
-    public ConnectionPool(final Host host, final Client client) {
-        this(host, client, Optional.empty(), Optional.empty());
-    }
-
-    public ConnectionPool(final Host host, final Client client, final Optional<Integer> overrideMinPoolSize,
-                          final Optional<Integer> overrideMaxPoolSize) {
-        this.host = host;
-        this.client = client;
-        this.cluster = client.cluster;
-        poolLabel = String.format("Connection Pool {host=%s}", host);
-
-        final Settings.ConnectionPoolSettings settings = settings();
-        this.minPoolSize = overrideMinPoolSize.orElse(settings.minSize);
-        this.maxPoolSize = overrideMaxPoolSize.orElse(settings.maxSize);
-        this.minSimultaneousUsagePerConnection = settings.minSimultaneousUsagePerConnection;
-        this.maxSimultaneousUsagePerConnection = settings.maxSimultaneousUsagePerConnection;
-        this.minInProcess = settings.minInProcessPerConnection;
-
-        this.connections = new CopyOnWriteArrayList<>();
-
-        try {
-            for (int i = 0; i < minPoolSize; i++)
-                this.connections.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection));
-        } catch (ConnectionException ce) {
-            // ok if we don't get it initialized here - when a request is attempted in a connection from the
-            // pool it will try to create new connections as needed.
-            logger.debug("Could not initialize connections in pool for {} - pool size at {}", host, this.connections.size());
-            considerHostUnavailable();
-        }
-
-        this.open = new AtomicInteger(connections.size());
-
-        logger.info("Opening connection pool on {} with core size of {}", host, minPoolSize);
-    }
-
-    public Settings.ConnectionPoolSettings settings() {
-        return cluster.connectionPoolSettings();
-    }
-
-    public Connection borrowConnection(final long timeout, final TimeUnit unit) throws TimeoutException, ConnectionException {
-        logger.debug("Borrowing connection from pool on {} - timeout in {} {}", host, timeout, unit);
-
-        if (isClosed()) throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
-
-        final Connection leastUsedConn = selectLeastUsed();
-
-        if (connections.isEmpty()) {
-            logger.debug("Tried to borrow connection but the pool was empty for {} - scheduling pool creation and waiting for connection", host);
-            for (int i = 0; i < minPoolSize; i++) {
-                // If many connections are borrowed at the same time there needs to be a check to make sure no
-                // additional ones get scheduled for creation
-                if (scheduledForCreation.get() < minPoolSize) {
-                    scheduledForCreation.incrementAndGet();
-                    newConnection();
-                }
-            }
-
-            return waitForConnection(timeout, unit);
-        }
-
-        if (null == leastUsedConn) {
-            if (isClosed())
-                throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
-            logger.debug("Pool was initialized but a connection could not be selected earlier - waiting for connection on {}", host);
-            return waitForConnection(timeout, unit);
-        }
-
-        // if the number borrowed on the least used connection exceeds the max allowed and the pool size is
-        // not at maximum then consider opening a connection
-        final int currentPoolSize = connections.size();
-        if (leastUsedConn.borrowed.get() >= maxSimultaneousUsagePerConnection && currentPoolSize < maxPoolSize) {
-            if (logger.isDebugEnabled())
-                logger.debug("Least used {} on {} exceeds maxSimultaneousUsagePerConnection but pool size {} < maxPoolSize - consider new connection",
-                        leastUsedConn.getConnectionInfo(), host, currentPoolSize);
-            considerNewConnection();
-        }
-
-        while (true) {
-            final int borrowed = leastUsedConn.borrowed.get();
-            final int availableInProcess = leastUsedConn.availableInProcess();
-
-            if (borrowed >= maxSimultaneousUsagePerConnection && leastUsedConn.availableInProcess() == 0) {
-                logger.debug("Least used connection selected from pool for {} but borrowed [{}] >= availableInProcess [{}] - wait",
-                        host, borrowed, availableInProcess);
-                return waitForConnection(timeout, unit);
-            }
-
-            if (leastUsedConn.borrowed.compareAndSet(borrowed, borrowed + 1)) {
-                if (logger.isDebugEnabled())
-                    logger.debug("Return least used {} on {}", leastUsedConn.getConnectionInfo(), host);
-                return leastUsedConn;
-            }
-        }
-    }
-
-    public void returnConnection(final Connection connection) throws ConnectionException {
-        logger.debug("Attempting to return {} on {}", connection, host);
-        if (isClosed()) throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
-
-        final int borrowed = connection.borrowed.decrementAndGet();
-
-        if (connection.isDead()) {
-            logger.debug("Marking {} as dead", this.host);
-            this.replaceConnection(connection);
-        } else {
-            if (bin.contains(connection) && borrowed == 0) {
-                logger.debug("{} is already in the bin and it has no inflight requests so it is safe to close", connection);
-                if (bin.remove(connection))
-                    connection.closeAsync();
-                return;
-            }
-
-            // destroy a connection that exceeds the minimum pool size - it does not have the right to live if it
-            // isn't busy. replace a connection that has a low available in process count which likely means that
-            // it's backing up with requests that might never have returned. consider the maxPoolSize in this condition
-            // because if it is equal to 1 (which it is for a session) then there is no need to replace the connection
-            // as it will be responsible for every single request. if neither of these scenarios are met then let the
-            // world know the connection is available.
-            final int poolSize = connections.size();
-            final int availableInProcess = connection.availableInProcess();
-            if (poolSize > minPoolSize && borrowed <= minSimultaneousUsagePerConnection) {
-                if (logger.isDebugEnabled())
-                    logger.debug("On {} pool size of {} > minPoolSize {} and borrowed of {} <= minSimultaneousUsagePerConnection {} so destroy {}",
-                            host, poolSize, minPoolSize, borrowed, minSimultaneousUsagePerConnection, connection.getConnectionInfo());
-                destroyConnection(connection);
-            } else if (availableInProcess < minInProcess && maxPoolSize > 1) {
-                if (logger.isDebugEnabled())
-                    logger.debug("On {} availableInProcess {} < minInProcess {} so replace {}", host, availableInProcess, minInProcess, connection.getConnectionInfo());
-                replaceConnection(connection);
-            } else
-                announceAvailableConnection();
-        }
-    }
-
-    Client getClient() {
-        return client;
-    }
-
-    Cluster getCluster() {
-        return cluster;
-    }
-
-    public boolean isClosed() {
-        return closeFuture.get() != null;
-    }
-
+public interface ConnectionPool {
+    int DEFAULT_MAX_POOL_SIZE = 8;
     /**
-     * Permanently kills the pool.
+     * @deprecated As of release 3.4.3, not replaced, this setting is ignored.
+     * @see <a href="https://issues.apache.org/jira/browse/TINKERPOP-2205">TINKERPOP-2205</a>
      */
-    public synchronized CompletableFuture<Void> closeAsync() {
-        if (closeFuture.get() != null) return closeFuture.get();
-
-        logger.info("Signalled closing of connection pool on {} with core size of {}", host, minPoolSize);
-
-        announceAllAvailableConnection();
-        final CompletableFuture<Void> future = killAvailableConnections();
-        closeFuture.set(future);
-        return future;
-    }
-
+    @Deprecated
+    int DEFAULT_MIN_POOL_SIZE = 2;
     /**
-     * Required for testing
+     * @deprecated As of release 3.4.3, not replaced, this setting is ignored.
+     * @see <a href="https://issues.apache.org/jira/browse/TINKERPOP-2205">TINKERPOP-2205</a>
      */
-    int numConnectionsWaitingToCleanup() {
-        return bin.size();
-    }
-
-    private CompletableFuture<Void> killAvailableConnections() {
-        final List<CompletableFuture<Void>> futures = new ArrayList<>(connections.size());
-        for (Connection connection : connections) {
-            final CompletableFuture<Void> future = connection.closeAsync();
-            future.thenRun(open::decrementAndGet);
-            futures.add(future);
-        }
-
-        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
-    }
-
-    void replaceConnection(final Connection connection) {
-        logger.debug("Replace {}", connection);
-
-        considerNewConnection();
-        definitelyDestroyConnection(connection);
-    }
-
-    private void considerNewConnection() {
-        logger.debug("Considering new connection on {} where pool size is {}", host, connections.size());
-        while (true) {
-            int inCreation = scheduledForCreation.get();
-
-            logger.debug("There are {} connections scheduled for creation on {}", inCreation, host);
-
-            // don't create more than one at a time
-            if (inCreation >= 1)
-                return;
-            if (scheduledForCreation.compareAndSet(inCreation, inCreation + 1))
-                break;
-        }
-
-        newConnection();
-    }
-
-    private void newConnection() {
-        cluster.executor().submit(() -> {
-            addConnectionIfUnderMaximum();
-            scheduledForCreation.decrementAndGet();
-            return null;
-        });
-    }
-
-    private boolean addConnectionIfUnderMaximum() {
-        while (true) {
-            int opened = open.get();
-            if (opened >= maxPoolSize)
-                return false;
-
-            if (open.compareAndSet(opened, opened + 1))
-                break;
-        }
-
-        if (isClosed()) {
-            open.decrementAndGet();
-            return false;
-        }
-
-        try {
-            connections.add(new Connection(host.getHostUri(), this, settings().maxInProcessPerConnection));
-        } catch (ConnectionException ce) {
-            logger.debug("Connections were under max, but there was an error creating the connection.", ce);
-            open.decrementAndGet();
-            considerHostUnavailable();
-            return false;
-        }
-
-        announceAvailableConnection();
-        return true;
-    }
-
-    private boolean destroyConnection(final Connection connection) {
-        while (true) {
-            int opened = open.get();
-            if (opened <= minPoolSize)
-                return false;
-
-            if (open.compareAndSet(opened, opened - 1))
-                break;
-        }
-
-        definitelyDestroyConnection(connection);
-        return true;
-    }
-
-    private void definitelyDestroyConnection(final Connection connection) {
-        // only add to the bin for future removal if its not already there.
-        if (!bin.contains(connection) && !connection.isClosing()) {
-            bin.add(connection);
-            connections.remove(connection);
-            open.decrementAndGet();
-        }
-
-        // only close the connection for good once it is done being borrowed or when it is dead
-        if (connection.isDead() || connection.borrowed.get() == 0) {
-            if(bin.remove(connection)) {
-                connection.closeAsync();
-                logger.debug("{} destroyed", connection.getConnectionInfo());
-            }
-        }
-    }
-
-    private Connection waitForConnection(final long timeout, final TimeUnit unit) throws TimeoutException, ConnectionException {
-        long start = System.nanoTime();
-        long remaining = timeout;
-        long to = timeout;
-        do {
-            try {
-                awaitAvailableConnection(remaining, unit);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                to = 0;
-            }
-
-            if (isClosed())
-                throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
-
-            final Connection leastUsed = selectLeastUsed();
-            if (leastUsed != null) {
-                while (true) {
-                    final int inFlight = leastUsed.borrowed.get();
-                    final int availableInProcess = leastUsed.availableInProcess();
-                    if (inFlight >= availableInProcess) {
-                        logger.debug("Least used {} on {} has requests borrowed [{}] >= availableInProcess [{}] - may timeout waiting for connection",
-                                leastUsed, host, inFlight, availableInProcess);
-                        break;
-                    }
-
-                    if (leastUsed.borrowed.compareAndSet(inFlight, inFlight + 1)) {
-                        if (logger.isDebugEnabled())
-                            logger.debug("Return least used {} on {} after waiting", leastUsed.getConnectionInfo(), host);
-                        return leastUsed;
-                    }
-                }
-            }
-
-            remaining = to - TimeUtil.timeSince(start, unit);
-            logger.debug("Continue to wait for connection on {} if {} > 0", host, remaining);
-        } while (remaining > 0);
-
-        logger.debug("Timed-out waiting for connection on {} - possibly unavailable", host);
-
-        // if we timeout borrowing a connection that might mean the host is dead (or the timeout was super short).
-        // either way supply a function to reconnect
-        this.considerHostUnavailable();
-
-        throw new TimeoutException("Timed-out waiting for connection on " + host + " - possibly unavailable");
-    }
-
-    public void considerHostUnavailable() {
-        // called when a connection is "dead" due to a non-recoverable error.
-        host.makeUnavailable(this::tryReconnect);
-
-        // if the host is unavailable then we should release the connections
-        connections.forEach(this::definitelyDestroyConnection);
-
-        // let the load-balancer know that the host is acting poorly
-        this.cluster.loadBalancingStrategy().onUnavailable(host);
-    }
-
+    @Deprecated
+    int DEFAULT_MIN_SIMULTANEOUS_USAGE_PER_CONNECTION = 8;
     /**
-     * Attempt to reconnect to the {@link Host} that was previously marked as unavailable.  This method gets called
-     * as part of a schedule in {@link Host} to periodically try to create working connections.
+     * @deprecated As of release 3.4.3, replaced by {@link ConnectionPool#DEFAULT_MAX_POOL_SIZE}. For backward
+     * compatibility it is still used to approximate the amount of parallelism required. In future versions, the
+     * approximation logic will be removed and dependency on this parameter will be completely eliminated.
+     * To disable the dependency on this parameter right now, explicitly set the value of
+     * {@link Settings.ConnectionPoolSettings#maxInProcessPerConnection} and {@link Settings.ConnectionPoolSettings#maxSimultaneousUsagePerConnection}
+     * to 0.
+     *
+     * @see ConnectionPoolImpl#calculateMaxPoolSize(Settings.ConnectionPoolSettings) for approximation
+     * logic.
+     * @see <a href="https://issues.apache.org/jira/browse/TINKERPOP-2205">TINKERPOP-2205</a>
      */
-    private boolean tryReconnect(final Host h) {
-        logger.debug("Trying to re-establish connection on {}", h);
-
-        Connection connection = null;
-        try {
-            connection = borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
-            final RequestMessage ping = client.buildMessage(cluster.validationRequest()).create();
-            final CompletableFuture<ResultSet> f = new CompletableFuture<>();
-            connection.write(ping, f);
-            f.get().all().get();
-
-            // host is reconnected and a connection is now available
-            this.cluster.loadBalancingStrategy().onAvailable(h);
-            return true;
-        } catch (Exception ex) {
-            logger.debug("Failed reconnect attempt on {}", h);
-            if (connection != null) definitelyDestroyConnection(connection);
-            return false;
-        }
-    }
-
-    private void announceAvailableConnection() {
-        logger.debug("Announce connection available on {}", host);
-
-        if (waiter == 0)
-            return;
-
-        waitLock.lock();
-        try {
-            hasAvailableConnection.signal();
-        } finally {
-            waitLock.unlock();
-        }
-    }
-
-    private Connection selectLeastUsed() {
-        int minInFlight = Integer.MAX_VALUE;
-        Connection leastBusy = null;
-        for (Connection connection : connections) {
-            final int inFlight = connection.borrowed.get();
-            if (!connection.isDead() && inFlight < minInFlight) {
-                minInFlight = inFlight;
-                leastBusy = connection;
-            }
-        }
-        return leastBusy;
-    }
-
-    private void awaitAvailableConnection(long timeout, TimeUnit unit) throws InterruptedException {
-        logger.debug("Wait {} {} for an available connection on {} with {}", timeout, unit, host, Thread.currentThread());
-
-        waitLock.lock();
-        waiter++;
-        try {
-            hasAvailableConnection.await(timeout, unit);
-        } finally {
-            waiter--;
-            waitLock.unlock();
-        }
-    }
-
-    private void announceAllAvailableConnection() {
-        if (waiter == 0)
-            return;
+    @Deprecated
+    int DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = 16;
+    /**
+     * Borrow a connection from the connection pool which would execute the request. Connection pool ensures
+     * that the connection is backed by a healthy {@link Channel} and WebSocket handshake is already complete.
+     *
+     * @return {@link Connection} which is backed by an active {@link Channel}
+     *         and could be used to send request.
+     *
+     * @throws TimeoutException When the connection could not be set timely
+     * @throws ConnectException When there is a connectivity problem associated with the server
+     */
+    Connection prepareConnection() throws TimeoutException, ConnectException;
 
-        waitLock.lock();
-        try {
-            hasAvailableConnection.signalAll();
-        } finally {
-            waitLock.unlock();
-        }
-    }
+    /**
+     * Get all the {@link Channel}s which are currently in-use.
+     */
+    ChannelGroup getActiveChannels();
 
-    public String getPoolInfo() {
-        final StringBuilder sb = new StringBuilder("ConnectionPool (");
-        sb.append(host);
-        sb.append(") - ");
-        connections.forEach(c -> {
-            sb.append(c);
-            sb.append(",");
-        });
-        return sb.toString().trim();
-    }
+    /**
+     * Release the connection and associated resources (like channel) so that the resources can be re-used.
+     */
+    CompletableFuture<Void> releaseConnection(Connection conn);
+    /**
+     * Close the connection pool and all associated resources gracefully.
+     * This method should be made idempotent and thread safe.
+     */
+    CompletableFuture<Void> closeAsync();
 
-    @Override
-    public String toString() {
-        return poolLabel;
-    }
+    ScheduledExecutorService executor();
+    /**
+     * @return {@link Host} associated with the connection pool
+     */
+    Host getHost();
+    /**
+     * @return {@link Cluster} containing the {@link Host}
+     */
+    Cluster getCluster();
 }
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java
new file mode 100644
index 0000000..63cab7e
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.pool.ChannelHealthChecker;
+import io.netty.channel.pool.ChannelPoolHandler;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.Promise;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Connection pool combines two entities. One is the underlying Netty channel pool and another is
+ * the Connection whose lifetime is synonymous with a request.
+ */
+public class ConnectionPoolImpl implements ConnectionPool {
+    private static final Logger logger = LoggerFactory.getLogger(ConnectionPoolImpl.class);
+    private final Host host;
+    private final Cluster cluster;
+    private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>(null);
+
+    /**
+     * Netty's implementation of channel management with an upper bound. This connection
+     * pool is responsible for attaching a channel with each request.
+     */
+    private TinkerpopFixedChannelPool channelPool;
+    /**
+     * Channel initializer that is safe to be re-used across multiple channels.
+     */
+    private Channelizer channelizer;
+
+    /**
+     * Keeps track of all the active channels. Closed channels are automatically removed
+     * from the group.
+     */
+    private final ChannelGroup activeChannels;
+
+    /**
+     * Create and initializes the connection pool
+     *
+     * @return A connection pool which has initialized its internal implementation.
+     */
+    public static ConnectionPool create(final Host host, final Cluster cluster) {
+        final ConnectionPoolImpl connPool = new ConnectionPoolImpl(host, cluster);
+        connPool.init();
+
+        logger.info("Created {}", connPool);
+
+        return connPool;
+    }
+
+    private ConnectionPoolImpl(final Host host, final Cluster cluster) {
+        this.host = host;
+        this.cluster = cluster;
+        this.activeChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+    }
+
+    private void init() {
+        Class<Channelizer> channelizerClass = null;
+        try {
+            channelizerClass = (Class<Channelizer>) Class.forName(cluster.connectionPoolSettings().channelizer);
+            this.channelizer = channelizerClass.newInstance();
+            this.channelizer.init(this);
+        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+            throw new IllegalArgumentException("Error while initializing channelizer " +
+                                                       (channelizerClass != null ? channelizerClass : "NULL"));
+        }
+
+        final Bootstrap b = cluster.getFactory().createBootstrap();
+        b.remoteAddress(host.getHostUri().getHost(), host.getHostUri().getPort());
+        // TODO: Use Epoll if available
+        b.channel(NioSocketChannel.class);
+
+        final ChannelPoolHandler handler = new ChannelPoolHandler() {
+            @Override
+            public void channelReleased(final Channel ch) {
+                // Note: Any operation performed here might have direct impact on the performance of the
+                // client since, this method is called with every new request.
+                logger.debug("Channel released: {}", ch);
+            }
+
+            @Override
+            public void channelAcquired(final Channel ch) {
+                // Note: Any operation performed here might have direct impact on the performance of the
+                // client since, this method is called with every new request.
+                logger.debug("Channel acquired: {}", ch);
+            }
+
+            @Override
+            public void channelCreated(final Channel ch) {
+                logger.debug("Channel created: {}", ch);
+                // Guaranteed that it is a socket channel because we set b.channel as SocketChannel
+                final SocketChannel sch = (SocketChannel) ch;
+                ((Channelizer.AbstractChannelizer) channelizer).initChannel(sch);
+            }
+        };
+
+        this.channelPool = createChannelPool(b, cluster.connectionPoolSettings(), handler);
+
+        logger.debug("Initialized {} successfully.", this);
+    }
+
+    private TinkerpopFixedChannelPool createChannelPool(final Bootstrap b,
+                                                        final Settings.ConnectionPoolSettings connectionPoolSettings,
+                                                        final ChannelPoolHandler handler) {
+        return new TinkerpopFixedChannelPool(b,
+                                             handler,
+                                             ChannelHealthChecker.ACTIVE,
+                                             TinkerpopFixedChannelPool.AcquireTimeoutAction.FAIL, // throw an exception on acquire timeout
+                                             connectionPoolSettings.maxWaitForConnection,
+                                             calculateMaxPoolSize(connectionPoolSettings), /*maxConnections*/
+                                             1, /*maxPendingAcquires*/
+                                             true);/*releaseHealthCheck*/
+    }
+
+    @Override
+    public ChannelGroup getActiveChannels() {
+        return this.activeChannels;
+    }
+
+    @Override
+    public CompletableFuture<Void> releaseConnection(final Connection connection) {
+        final Channel channelAssociatedToConnection = connection.getChannel();
+
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        final Promise<Void> promise = channelAssociatedToConnection.newPromise().addListener(f -> {
+            if (f.isDone()) {
+                future.complete(null);
+            } else {
+                future.completeExceptionally(f.cause());
+            }
+        });
+
+        this.channelPool.release(channelAssociatedToConnection, promise);
+
+        return future;
+    }
+
+    @Override
+    public synchronized CompletableFuture<Void> closeAsync() {
+        if (closeFuture.get() != null) return closeFuture.get(); // Make this API idempotent
+
+        final CompletableFuture activeChannelClosedFuture = CompletableFuture.runAsync(() -> {
+            logger.info("Closing active channels borrowed from ChannelPool [BusyConnectionCount={}]", this.getActiveChannels().size());
+            this.activeChannels.close().syncUninterruptibly();
+            logger.debug("Closed all active channels.");
+        });
+
+        final CompletableFuture<Void> channelPoolClosedFuture = new CompletableFuture<>();
+        this.channelPool.closeAsync().addListener((f) -> {
+            if (f.isSuccess()) {
+                logger.debug("Closed underlying ChannelPool {}", this.channelPool);
+                channelPoolClosedFuture.complete(null);
+            } else {
+                logger.error("ChannelPool did not close gracefully", f.cause());
+                channelPoolClosedFuture.completeExceptionally(f.cause());
+            }
+        });
+
+        closeFuture.set(CompletableFuture.allOf(channelPoolClosedFuture, activeChannelClosedFuture)
+                                         .thenRun(() -> logger.info("Closed {}", this)));
+
+        return closeFuture.get();
+    }
+
+    @Override
+    public ScheduledExecutorService executor() {
+        return this.cluster.executor();
+    }
+
+    @Override
+    public Host getHost() {
+        return this.host;
+    }
+
+    @Override
+    public Connection prepareConnection() throws TimeoutException, ConnectException {
+        if (closeFuture.get() != null) {
+            throw new RuntimeException(this + " is closing. Cannot borrow connection.");
+        }
+
+        // Get a channel, verify handshake is done and then attach it to a connectionPool
+        final Channel ch = this.channelPool.acquire().syncUninterruptibly().getNow();
+
+        // TODO: This call is un-necessary on every channel acquire, since handshake is done once.
+        channelizer.connected(ch);
+
+        return new SingleRequestConnection(ch, this);
+    }
+
+    /**
+     * Calculates the max size of the channel pool. To maintain backward compatibility
+     * it is calculated as a function of maxInProcess and maxSimultaneousUsage. In future
+     * version, when backward compatibility is not required, it should be equal to
+     * Connectionpoolsettings.maxSize
+     */
+    int calculateMaxPoolSize(Settings.ConnectionPoolSettings connectionPoolSettings) {
+        if (connectionPoolSettings.maxSimultaneousUsagePerConnection != 0 || connectionPoolSettings.maxInProcessPerConnection != 0) {
+            return connectionPoolSettings.maxSize * Math.max(connectionPoolSettings.maxSimultaneousUsagePerConnection, connectionPoolSettings.maxInProcessPerConnection);
+        } else {
+            return connectionPoolSettings.maxSize;
+        }
+    }
+
+    @Override
+    public Cluster getCluster() {
+        return this.cluster;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("ConnectionPool{closing=%s, host=%s, BusyConnectionCount=%d}",
+                             (closeFuture.get() != null),
+                             host,
+                             this.channelPool.acquiredChannelCount());
+    }
+}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
index 40231ec..7b09cc7 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
@@ -18,32 +18,24 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
 import io.netty.util.AttributeMap;
+import io.netty.util.ReferenceCountUtil;
+
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.util.Attribute;
-import io.netty.util.AttributeKey;
-import io.netty.util.ReferenceCountUtil;
 import org.apache.tinkerpop.gremlin.driver.ser.SerializationException;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
-import java.security.PrivilegedActionException;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-
 import javax.security.auth.Subject;
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
@@ -55,6 +47,17 @@ import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+
 /**
  * Holder for internal handler classes used in constructing the channel pipeline.
  *
@@ -69,7 +72,9 @@ final class Handler {
         private static final Logger logger = LoggerFactory.getLogger(GremlinSaslAuthenticationHandler.class);
         private static final AttributeKey<Subject> subjectKey = AttributeKey.valueOf("subject");
         private static final AttributeKey<SaslClient> saslClientKey = AttributeKey.valueOf("saslclient");
-        private static final Map<String, String> SASL_PROPERTIES = new HashMap<String, String>() {{ put(Sasl.SERVER_AUTH, "true"); }};
+        private static final Map<String, String> SASL_PROPERTIES = new HashMap<String, String>() {{
+            put(Sasl.SERVER_AUTH, "true");
+        }};
         private static final byte[] NULL_CHALLENGE = new byte[0];
 
         private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
@@ -192,36 +197,29 @@ final class Handler {
     }
 
     /**
-     * Takes a map of requests pending responses and writes responses to the {@link ResultQueue} of a request
-     * as the {@link ResponseMessage} objects are deserialized.
+     * Handles all responses from the server (including channel exceptions) and writes responses to
+     * the {@link ResultQueue} of a request as the {@link ResponseMessage} objects are deserialized.
      */
+    @ChannelHandler.Sharable
     static class GremlinResponseHandler extends SimpleChannelInboundHandler<ResponseMessage> {
         private static final Logger logger = LoggerFactory.getLogger(GremlinResponseHandler.class);
-        private final ConcurrentMap<UUID, ResultQueue> pending;
-
-        public GremlinResponseHandler(final ConcurrentMap<UUID, ResultQueue> pending) {
-            this.pending = pending;
-        }
 
         @Override
-        public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
-            // occurs when the server shuts down in a disorderly fashion, otherwise in an orderly shutdown the server
-            // should fire off a close message which will properly release the driver.
-            super.channelInactive(ctx);
-
+        public void channelInactive(final ChannelHandlerContext ctx) {
             // the channel isn't going to get anymore results as it is closed so release all pending requests
-            pending.values().forEach(val -> val.markError(new IllegalStateException("Connection to server is no longer active")));
-            pending.clear();
+            getResultQueueAttachedToChannel(ctx)
+                    .filter(isResultQueueActive)
+                    .ifPresent((rq) -> rq.markError(new IOException("Connection" + ctx.channel() + " to server is no longer active")));
         }
 
         @Override
-        protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) throws Exception {
+        protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) {
             try {
-                final ResponseStatusCode statusCode = response.getStatus().getCode();
-                final ResultQueue queue = pending.get(response.getRequestId());
-                if (statusCode == ResponseStatusCode.SUCCESS || statusCode == ResponseStatusCode.PARTIAL_CONTENT) {
-                    final Object data = response.getResult().getData();
-                    final Map<String,Object> meta = response.getResult().getMeta();
+                this.getResultQueueAttachedToChannel(channelHandlerContext).ifPresent(queue -> {
+                    final ResponseStatusCode statusCode = response.getStatus().getCode();
+                    if (statusCode == ResponseStatusCode.SUCCESS || statusCode == ResponseStatusCode.PARTIAL_CONTENT) {
+                        final Object data = response.getResult().getData();
+                        final Map<String, Object> meta = response.getResult().getMeta();
 
                     // this is a "result" from the server which is either the result of a script or a
                     // serialized traversal
@@ -246,10 +244,11 @@ final class Handler {
                     }
                 }
 
-                // as this is a non-PARTIAL_CONTENT code - the stream is done.
-                if (statusCode != ResponseStatusCode.PARTIAL_CONTENT) {
-                    pending.remove(response.getRequestId()).markComplete(response.getStatus().getAttributes());
-                }
+                    // as this is a non-PARTIAL_CONTENT code - the stream is done.
+                    if (statusCode != ResponseStatusCode.PARTIAL_CONTENT) {
+                        queue.markComplete(response.getStatus().getAttributes());
+                    }
+                });
             } finally {
                 // in the event of an exception above the exception is tossed and handled by whatever channelpipeline
                 // error handling is at play.
@@ -258,29 +257,37 @@ final class Handler {
         }
 
         @Override
-        public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
-            // if this happens enough times (like the client is unable to deserialize a response) the pending
-            // messages queue will not clear.  wonder if there is some way to cope with that.  of course, if
-            // there are that many failures someone would take notice and hopefully stop the client.
-            logger.error("Could not process the response", cause);
+        public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+            if (logger.isDebugEnabled())
+                logger.debug("Could not process the response for channel {}", ctx.channel(), cause);
 
             // the channel took an error because of something pretty bad so release all the futures out there
-            pending.values().forEach(val -> val.markError(cause));
-            pending.clear();
+            getResultQueueAttachedToChannel(ctx).filter(isResultQueueActive).ifPresent((rq) -> rq.markError(cause));
 
             // serialization exceptions should not close the channel - that's worth a retry
             if (!IteratorUtils.anyMatch(ExceptionUtils.getThrowableList(cause).iterator(), t -> t instanceof SerializationException))
                 if (ctx.channel().isActive()) ctx.close();
         }
 
-        private Map<String,Object> cleanStatusAttributes(final Map<String,Object> statusAttributes) {
-            final Map<String,Object> m = new HashMap<>();
-            statusAttributes.forEach((k,v) -> {
+        private Map<String, Object> cleanStatusAttributes(final Map<String, Object> statusAttributes) {
+            final Map<String, Object> m = new HashMap<>();
+            statusAttributes.forEach((k, v) -> {
                 if (!k.equals(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) && !k.equals(Tokens.STATUS_ATTRIBUTE_STACK_TRACE))
-                    m.put(k,v);
+                    m.put(k, v);
             });
             return m;
         }
+
+        private Optional<ResultQueue> getResultQueueAttachedToChannel(final ChannelHandlerContext ctx) {
+            if (!ctx.channel().hasAttr(SingleRequestConnection.RESULT_QUEUE_ATTRIBUTE_KEY) ||
+                    (ctx.channel().attr(SingleRequestConnection.RESULT_QUEUE_ATTRIBUTE_KEY).get() == null)) {
+                return Optional.empty();
+            }
+
+            return Optional.of(ctx.channel().attr(SingleRequestConnection.RESULT_QUEUE_ATTRIBUTE_KEY).get());
+        }
+
+        private Predicate<ResultQueue> isResultQueueActive = rq -> !rq.isComplete();
     }
 
-}
+}
\ No newline at end of file
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java
index 5fd2b40..de2675f 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java
@@ -109,6 +109,10 @@ public final class Host {
         return hostLabel;
     }
 
+    public Cluster getCluster() {
+        return this.cluster;
+    }
+
     public static interface Listener {
         public void onAvailable(final Host host);
 
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
index 8ce4fba..da97491 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
@@ -53,6 +53,11 @@ final class ResultQueue {
 
     private Map<String,Object> statusAttributes = null;
 
+    /**
+     * Channel processing this result queue.
+     */
+    private String channelId;
+
     public ResultQueue(final LinkedBlockingQueue<Result> resultLinkedBlockingQueue, final CompletableFuture<Void> readComplete) {
         this.resultLinkedBlockingQueue = resultLinkedBlockingQueue;
         this.readComplete = readComplete;
@@ -127,6 +132,14 @@ final class ResultQueue {
         return statusAttributes;
     }
 
+    void setChannelId(final String channelId) {
+        this.channelId = channelId;
+    }
+
+    String getChannelId() {
+        return this.channelId;
+    }
+
     /**
      * Completes the next waiting future if there is one.
      */
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
index 85c74f3..0dd5842 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
@@ -189,4 +189,11 @@ public final class ResultSet implements Iterable<Result> {
             }
         };
     }
+
+    /*
+     * Only used for testing.
+     */
+    String getChannelId() {
+        return resultQueue.getChannelId();
+    }
 }
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
index 0b5df84..3049ea9 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
@@ -299,76 +299,103 @@ final class Settings {
 
         /**
          * The minimum size of a connection pool for a {@link Host}. By default this is set to 2.
+         * @deprecated As of release 3.4.3, value is ignore
          */
-        public int minSize = ConnectionPool.MIN_POOL_SIZE;
+        @Deprecated
+        public int minSize = ConnectionPool.DEFAULT_MIN_POOL_SIZE;
 
         /**
          * The maximum size of a connection pool for a {@link Host}. By default this is set to 8.
          */
-        public int maxSize = ConnectionPool.MAX_POOL_SIZE;
+        public int maxSize = ConnectionPool.DEFAULT_MAX_POOL_SIZE;
 
         /**
          * Length of time in milliseconds to wait on an idle connection before sending a keep-alive request. This
          * setting is only relevant to {@link Channelizer} implementations that return {@code true} for
          * {@link Channelizer#supportsKeepAlive()}. Set to zero to disable this feature.
          */
-        public long keepAliveInterval = Connection.KEEP_ALIVE_INTERVAL;
+        public long keepAliveInterval = Connection.DEFAULT_KEEP_ALIVE_INTERVAL;
 
         /**
          * A connection under low use can be destroyed. This setting determines the threshold for determining when
          * that connection can be released and is defaulted to 8.
+         *
+         * @deprecated As of release 3.4.3, not replaced, this parameter is ignored.
+         * @see <a href="https://issues.apache.org/jira/browse/TINKERPOP-2205">TINKERPOP-2205</a>
          */
-        public int minSimultaneousUsagePerConnection = ConnectionPool.MIN_SIMULTANEOUS_USAGE_PER_CONNECTION;
+        @Deprecated
+        public int minSimultaneousUsagePerConnection = ConnectionPool.DEFAULT_MIN_SIMULTANEOUS_USAGE_PER_CONNECTION;
 
         /**
          * If a connection is over used, then it might mean that is necessary to expand the pool by adding a new
-         * connection.  This setting determines the threshold for a connections over use and is defaulted to 16
+         * connection.  This setting determines the threshold for a connections over use and is defaulted to 16. Set
+         * the value to 0 to disable the use of this parameter.
+         *
+         * @deprecated As of release 3.4.3, replaced by {@link ConnectionPool#DEFAULT_MAX_POOL_SIZE}.
+         * @see <a href="https://issues.apache.org/jira/browse/TINKERPOP-2205">TINKERPOP-2205</a>
          */
-        public int maxSimultaneousUsagePerConnection = ConnectionPool.MAX_SIMULTANEOUS_USAGE_PER_CONNECTION;
+        @Deprecated
+        public int maxSimultaneousUsagePerConnection = ConnectionPool.DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION;
 
         /**
-         * The maximum number of requests in flight on a connection where the default is 4.
+         * The maximum number of requests in flight on a connection where the default is 4. Set the value to 0 to disable
+         * the use of this parameter.
+         *
+         * @deprecated As of release 3.4.3, replaced by {@link ConnectionPool#DEFAULT_MAX_POOL_SIZE}. For backward
+         * compatibility it is still used to approximate the amount of parallelism required. In future versions, the
+         * approximation logic will be removed and dependency on this parameter will be completely eliminated.
+         * To disable the dependency on this parameter right now, explicitly set the value of
+         * {@link Settings.ConnectionPoolSettings#maxInProcessPerConnection} and {@link Settings.ConnectionPoolSettings#maxSimultaneousUsagePerConnection}
+         * to 0.
+         *
+         * @see ConnectionPoolImpl#calculateMaxPoolSize(Settings.ConnectionPoolSettings) for approximation logic.
+         * @see <a href="https://issues.apache.org/jira/browse/TINKERPOP-2205">TINKERPOP-2205</a>
          */
-        public int maxInProcessPerConnection = Connection.MAX_IN_PROCESS;
+        @Deprecated
+        public int maxInProcessPerConnection = Connection.DEFAULT_MAX_IN_PROCESS;
 
         /**
          * A connection has available in-process requests which is calculated by subtracting the number of current
          * in-flight requests on a connection and subtracting that from the {@link #maxInProcessPerConnection}. When
          * that number drops below this configuration setting, the connection is recommended for replacement. The
          * default for this setting is 1.
+         *
+         * @deprecated As of release 3.4.3, not replaced, this parameter is ignored.
+         * @see <a href="https://issues.apache.org/jira/browse/TINKERPOP-2205">TINKERPOP-2205</a>
          */
-        public int minInProcessPerConnection = Connection.MIN_IN_PROCESS;
+        @Deprecated
+        public int minInProcessPerConnection = Connection.DEFAULT_MIN_IN_PROCESS;
 
         /**
          * The amount of time in milliseconds to wait for a new connection before timing out where the default value
          * is 3000.
          */
-        public int maxWaitForConnection = Connection.MAX_WAIT_FOR_CONNECTION;
+        public int maxWaitForConnection = Connection.DEFAULT_MAX_WAIT_FOR_CONNECTION;
 
         /**
          * If the connection is using a "session" this setting represents the amount of time in milliseconds to wait
          * for that session to close before timing out where the default value is 3000. Note that the server will
          * eventually clean up dead sessions itself on expiration of the session or during shutdown.
          */
-        public int maxWaitForSessionClose = Connection.MAX_WAIT_FOR_SESSION_CLOSE;
+        public int maxWaitForSessionClose = Connection.DEFAULT_MAX_WAIT_FOR_SESSION_CLOSE;
 
         /**
          * The maximum length in bytes that a message can be sent to the server. This number can be no greater than
          * the setting of the same name in the server configuration. The default value is 65536.
          */
-        public int maxContentLength = Connection.MAX_CONTENT_LENGTH;
+        public int maxContentLength = Connection.DEFAULT_MAX_CONTENT_LENGTH;
 
         /**
          * The amount of time in milliseconds to wait before trying to reconnect to a dead host. The default value is
          * 1000.
          */
-        public int reconnectInterval = Connection.RECONNECT_INTERVAL;
+        public int reconnectInterval = Connection.DEFAULT_RECONNECT_INTERVAL;
 
         /**
          * The override value for the size of the result batches to be returned from the server. This value is set to
          * 64 by default.
          */
-        public int resultIterationBatchSize = Connection.RESULT_ITERATION_BATCH_SIZE;
+        public int resultIterationBatchSize = Connection.DEFAULT_RESULT_ITERATION_BATCH_SIZE;
 
         /**
          * The constructor for the channel that connects to the server. This value should be the fully qualified
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/SingleRequestConnection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/SingleRequestConnection.java
new file mode 100644
index 0000000..6bb745d
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/SingleRequestConnection.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.AttributeKey;
+
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Connection that can be used to submit only a single request.
+ */
+public class SingleRequestConnection implements Connection {
+    /*
+     * NOTE: An instance of this class is created for every request. Keep the member variables of this class lean and thin to
+     * avoid creating excess objects on the heap.
+     */
+    private static final Logger logger = LoggerFactory.getLogger(SingleRequestConnection.class);
+
+    private final Channel channel;
+    private final ConnectionPool pool;
+
+    private CompletableFuture<ResultSet> resultFuture;
+    private ChannelPromise requestPromise;
+
+    /**
+     * Future that indicates the release of the underlying resources (like the channel) for this connection.
+     */
+    private AtomicReference<CompletableFuture<Void>> releaseFuture;
+
+    static final AttributeKey<ResultQueue> RESULT_QUEUE_ATTRIBUTE_KEY = AttributeKey.newInstance("resultQueueFuture");
+
+    SingleRequestConnection(final Channel channel, final ConnectionPool pool) {
+        /* A channel is attached with a request only when the channel is active. This is the responsibility
+         * of channelpool to ensure that the channel attached to this connection is healthy. Something is fishy
+         * if this is not true, hence, IllegalState.
+         */
+        if (!channel.isActive()) {
+            throw new IllegalStateException("Channel " + channel + " is not active.");
+        }
+
+        this.channel = channel;
+        this.pool = pool;
+        this.releaseFuture = new AtomicReference<>(null);
+    }
+
+    private void onChannelWriteError(final CompletableFuture<ResultSet> resultQueueSetup, final Throwable cause) {
+        if (cause instanceof ClosedChannelException) {
+            resultQueueSetup.completeExceptionally(new ConnectException("Failed to connect to the server. Check the server connectivity."));
+        } else {
+            resultQueueSetup.completeExceptionally(cause);
+        }
+
+        logger.debug("Write to the channel failed. {} may be dead. Returning to pool for replacement.", this, cause);
+
+        this.releaseResources();
+    }
+
+    private void onResultReadCompleteError(final Throwable cause) {
+        if (cause instanceof InterruptedException) {
+            logger.debug("Forcing close of {}.", this, cause);
+            // HACK: There is no good way to signal to the server that request is complete
+            // so that it can release resources. As a nuke option, close the channel itself.
+            this.forceTerminate();
+        } else if (cause instanceof ResponseException) {
+            logger.debug("Error while processing request on the server {}.", this, cause);
+            this.releaseResources();
+        } else {
+            // This could be a case when a connected channel processing a request has been closed
+            // from the client side.
+            logger.debug("Error while reading the response from the server for {}.", this, cause);
+            this.releaseResources();
+        }
+    }
+
+    private void onResultReadCompleteSuccess() {
+        // return to pool on successful completion of reading the results
+        this.releaseResources();
+    }
+
+    /**
+     * Force an abnormal close of a connection. This is accomplished by closing the underlying Netty {@link Channel}.
+     */
+    private void forceTerminate() {
+        this.channel.close();
+
+        this.channel.closeFuture().addListener(f -> {
+            if (!f.isSuccess()) {
+                logger.warn("Failed to closed channel {}. This might lead to a connection leak.", channel, f.cause());
+            }
+
+            this.pool.executor().submit(this::releaseResources);
+        });
+    }
+
+    /**
+     * Closes the connection gracefully by releasing the resources and notifying the appropriate
+     * listener. This is an idempotent API.
+     *
+     * @return Future that represents a successful release of all the resources
+     */
+    synchronized CompletableFuture<Void> releaseResources() {
+        // make this operation idempotent
+        if (this.releaseFuture.get() != null) {
+            return releaseFuture.get();
+        }
+
+        // Remove the result queue from the channel
+        if (channel.hasAttr(RESULT_QUEUE_ATTRIBUTE_KEY) && (channel.attr(RESULT_QUEUE_ATTRIBUTE_KEY).get() != null)) {
+            final ResultQueue resultQueue = channel.attr(RESULT_QUEUE_ATTRIBUTE_KEY).getAndSet(null);
+            if (!resultQueue.isComplete()) {
+                // resultQueue should have been completely either successfully or exceptionally by now by the channel handler.
+                resultQueue.markError(new IllegalStateException("Failed to read results. Connection to the server is closed."));
+            }
+        }
+
+        releaseFuture.compareAndSet(null, this.pool.releaseConnection(this));
+
+        return releaseFuture.get();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public ChannelPromise write(final RequestMessage requestMessage, final CompletableFuture<ResultSet> resultQueueSetup) {
+        if (this.resultFuture != null) {
+            throw new IllegalStateException("This " + this + " is already in use. Cannot reuse it for request " + requestMessage);
+        }
+
+        this.resultFuture = resultQueueSetup;
+
+        final CompletableFuture<Void> readCompleted = new CompletableFuture<>();
+        readCompleted.whenCompleteAsync((v, t) -> {
+            if (t != null) {
+                this.onResultReadCompleteError(t);
+            } else {
+                this.onResultReadCompleteSuccess();
+            }
+        }, pool.executor());
+
+        // As a further optimization this creation could be done on successful write to the server.
+        final LinkedBlockingQueue<Result> resultLinkedBlockingQueue = new LinkedBlockingQueue<>();
+        final ResultQueue queue = new ResultQueue(resultLinkedBlockingQueue, readCompleted);
+        queue.setChannelId(channel.id().asLongText());
+        if (!channel.attr(RESULT_QUEUE_ATTRIBUTE_KEY).compareAndSet(null, queue)) {
+            throw new IllegalStateException("Channel " + channel + " already has a result queue attached to it");
+        }
+
+        final SingleRequestConnection thisSingleRequestConnection = this;
+        requestPromise = channel.newPromise()
+                                .addListener(f -> {
+                                    // Delegate event handling to workers after I/O to free up EventLoopThreads
+                                    if (!f.isSuccess()) {
+                                        pool.executor().submit(() -> thisSingleRequestConnection.onChannelWriteError(resultQueueSetup, f.cause()));
+                                    } else {
+                                        // resultQueueSetup should only be completed by a worker since the application code might have sync
+                                        // completion stages attached to which and we do not want the event loop threads to process those
+                                        // stages.
+                                        pool.executor().submit(() -> resultQueueSetup.complete(new ResultSet(queue,
+                                                                                                             pool.executor(),
+                                                                                                             readCompleted,
+                                                                                                             requestMessage,
+                                                                                                             thisSingleRequestConnection.getHost())));
+                                    }
+                                });
+
+        channel.writeAndFlush(requestMessage, requestPromise);
+
+        return requestPromise;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Channel getChannel() {
+        return this.channel;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Host getHost() {
+        return this.pool.getHost();
+    }
+
+    @Override
+    public String toString() {
+        return String.format("Connection{host=%s,channel=%s}", pool.getHost(), channel);
+    }
+}
\ No newline at end of file
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/TinkerpopFixedChannelPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/TinkerpopFixedChannelPool.java
new file mode 100644
index 0000000..ac77b38
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/TinkerpopFixedChannelPool.java
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.pool.*;
+import io.netty.util.concurrent.*;
+import io.netty.util.internal.ObjectUtil;
+import io.netty.util.internal.ThrowableUtil;
+
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * TODO
+ * This class is a fork of Netty's {@link FixedChannelPool}. This should be removed once
+ * https://github.com/netty/netty/pull/9226 is resolved and we start consuming the release containing
+ * the fix.
+ */
+public class TinkerpopFixedChannelPool extends SimpleChannelPool {
+    private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace(
+            new IllegalStateException("Too many outstanding acquire operations"),
+            TinkerpopFixedChannelPool.class, "acquire0(...)");
+    private static final TimeoutException TIMEOUT_EXCEPTION = ThrowableUtil.unknownStackTrace(
+            new TimeoutException("Acquire operation took longer then configured maximum time"),
+            TinkerpopFixedChannelPool.class, "<init>(...)");
+    static final IllegalStateException POOL_CLOSED_ON_RELEASE_EXCEPTION = ThrowableUtil.unknownStackTrace(
+            new IllegalStateException("FixedChannelPool was closed"),
+            TinkerpopFixedChannelPool.class, "release(...)");
+    static final IllegalStateException POOL_CLOSED_ON_ACQUIRE_EXCEPTION = ThrowableUtil.unknownStackTrace(
+            new IllegalStateException("FixedChannelPool was closed"),
+            TinkerpopFixedChannelPool.class, "acquire0(...)");
+    public enum AcquireTimeoutAction {
+        /**
+         * Create a new connection when the timeout is detected.
+         */
+        NEW,
+
+        /**
+         * Fail the {@link Future} of the acquire call with a {@link TimeoutException}.
+         */
+        FAIL
+    }
+
+    private final EventExecutor executor;
+    private final long acquireTimeoutNanos;
+    private final Runnable timeoutTask;
+
+    // There is no need to worry about synchronization as everything that modified the queue or counts is done
+    // by the above EventExecutor.
+    private final Queue<TinkerpopFixedChannelPool.AcquireTask> pendingAcquireQueue = new ArrayDeque<TinkerpopFixedChannelPool.AcquireTask>();
+    private final int maxConnections;
+    private final int maxPendingAcquires;
+    private final AtomicInteger acquiredChannelCount = new AtomicInteger();
+    private int pendingAcquireCount;
+    private boolean closed;
+
+    /**
+     * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
+     *
+     * @param bootstrap         the {@link Bootstrap} that is used for connections
+     * @param handler           the {@link ChannelPoolHandler} that will be notified for the different pool actions
+     * @param maxConnections    the number of maximal active connections, once this is reached new tries to acquire
+     *                          a {@link Channel} will be delayed until a connection is returned to the pool again.
+     */
+    public TinkerpopFixedChannelPool(Bootstrap bootstrap,
+                            ChannelPoolHandler handler, int maxConnections) {
+        this(bootstrap, handler, maxConnections, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
+     *
+     * @param bootstrap             the {@link Bootstrap} that is used for connections
+     * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
+     * @param maxConnections        the number of maximal active connections, once this is reached new tries to
+     *                              acquire a {@link Channel} will be delayed until a connection is returned to the
+     *                              pool again.
+     * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
+     *                              be failed.
+     */
+    public TinkerpopFixedChannelPool(Bootstrap bootstrap,
+                            ChannelPoolHandler handler, int maxConnections, int maxPendingAcquires) {
+        this(bootstrap, handler, ChannelHealthChecker.ACTIVE, null, -1, maxConnections, maxPendingAcquires);
+    }
+
+    /**
+     * Creates a new instance.
+     *
+     * @param bootstrap             the {@link Bootstrap} that is used for connections
+     * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
+     * @param healthCheck           the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
+     *                              still healthy when obtain from the {@link ChannelPool}
+     * @param action                the {@link TinkerpopFixedChannelPool.AcquireTimeoutAction} to use or {@code null} if non should be used.
+     *                              In this case {@param acquireTimeoutMillis} must be {@code -1}.
+     * @param acquireTimeoutMillis  the time (in milliseconds) after which an pending acquire must complete or
+     *                              the {@link TinkerpopFixedChannelPool.AcquireTimeoutAction} takes place.
+     * @param maxConnections        the number of maximal active connections, once this is reached new tries to
+     *                              acquire a {@link Channel} will be delayed until a connection is returned to the
+     *                              pool again.
+     * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
+     *                              be failed.
+     */
+    public TinkerpopFixedChannelPool(Bootstrap bootstrap,
+                            ChannelPoolHandler handler,
+                            ChannelHealthChecker healthCheck, TinkerpopFixedChannelPool.AcquireTimeoutAction action,
+                            final long acquireTimeoutMillis,
+                            int maxConnections, int maxPendingAcquires) {
+        this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, true);
+    }
+
+    /**
+     * Creates a new instance.
+     *
+     * @param bootstrap             the {@link Bootstrap} that is used for connections
+     * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
+     * @param healthCheck           the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
+     *                              still healthy when obtain from the {@link ChannelPool}
+     * @param action                the {@link TinkerpopFixedChannelPool.AcquireTimeoutAction} to use or {@code null} if non should be used.
+     *                              In this case {@param acquireTimeoutMillis} must be {@code -1}.
+     * @param acquireTimeoutMillis  the time (in milliseconds) after which an pending acquire must complete or
+     *                              the {@link TinkerpopFixedChannelPool.AcquireTimeoutAction} takes place.
+     * @param maxConnections        the number of maximal active connections, once this is reached new tries to
+     *                              acquire a {@link Channel} will be delayed until a connection is returned to the
+     *                              pool again.
+     * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
+     *                              be failed.
+     * @param releaseHealthCheck    will check channel health before offering back if this parameter set to
+     *                              {@code true}.
+     */
+    public TinkerpopFixedChannelPool(Bootstrap bootstrap,
+                            ChannelPoolHandler handler,
+                            ChannelHealthChecker healthCheck, TinkerpopFixedChannelPool.AcquireTimeoutAction action,
+                            final long acquireTimeoutMillis,
+                            int maxConnections, int maxPendingAcquires, final boolean releaseHealthCheck) {
+        this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires,
+                releaseHealthCheck, true);
+    }
+
+    /**
+     * Creates a new instance.
+     *
+     * @param bootstrap             the {@link Bootstrap} that is used for connections
+     * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
+     * @param healthCheck           the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
+     *                              still healthy when obtain from the {@link ChannelPool}
+     * @param action                the {@link TinkerpopFixedChannelPool.AcquireTimeoutAction} to use or {@code null} if non should be used.
+     *                              In this case {@param acquireTimeoutMillis} must be {@code -1}.
+     * @param acquireTimeoutMillis  the time (in milliseconds) after which an pending acquire must complete or
+     *                              the {@link TinkerpopFixedChannelPool.AcquireTimeoutAction} takes place.
+     * @param maxConnections        the number of maximal active connections, once this is reached new tries to
+     *                              acquire a {@link Channel} will be delayed until a connection is returned to the
+     *                              pool again.
+     * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
+     *                              be failed.
+     * @param releaseHealthCheck    will check channel health before offering back if this parameter set to
+     *                              {@code true}.
+     * @param lastRecentUsed        {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO.
+     */
+    public TinkerpopFixedChannelPool(Bootstrap bootstrap,
+                            ChannelPoolHandler handler,
+                            ChannelHealthChecker healthCheck, TinkerpopFixedChannelPool.AcquireTimeoutAction action,
+                            final long acquireTimeoutMillis,
+                            int maxConnections, int maxPendingAcquires,
+                            boolean releaseHealthCheck, boolean lastRecentUsed) {
+        super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed);
+        if (maxConnections < 1) {
+            throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)");
+        }
+        if (maxPendingAcquires < 1) {
+            throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)");
+        }
+        if (action == null && acquireTimeoutMillis == -1) {
+            timeoutTask = null;
+            acquireTimeoutNanos = -1;
+        } else if (action == null && acquireTimeoutMillis != -1) {
+            throw new NullPointerException("action");
+        } else if (action != null && acquireTimeoutMillis < 0) {
+            throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)");
+        } else {
+            acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
+            switch (action) {
+                case FAIL:
+                    timeoutTask = new TinkerpopFixedChannelPool.TimeoutTask() {
+                        @Override
+                        public void onTimeout(TinkerpopFixedChannelPool.AcquireTask task) {
+                            // Fail the promise as we timed out.
+                            task.promise.setFailure(TIMEOUT_EXCEPTION);
+                        }
+                    };
+                    break;
+                case NEW:
+                    timeoutTask = new TinkerpopFixedChannelPool.TimeoutTask() {
+                        @Override
+                        public void onTimeout(TinkerpopFixedChannelPool.AcquireTask task) {
+                            // Increment the acquire count and delegate to super to actually acquire a Channel which will
+                            // create a new connection.
+                            task.acquired();
+
+                            TinkerpopFixedChannelPool.super.acquire(task.promise);
+                        }
+                    };
+                    break;
+                default:
+                    throw new Error();
+            }
+        }
+        executor = bootstrap.config().group().next();
+        this.maxConnections = maxConnections;
+        this.maxPendingAcquires = maxPendingAcquires;
+    }
+
+    /** Returns the number of acquired channels that this pool thinks it has. */
+    public int acquiredChannelCount() {
+        return acquiredChannelCount.get();
+    }
+
+    @Override
+    public Future<Channel> acquire(final Promise<Channel> promise) {
+        try {
+            if (executor.inEventLoop()) {
+                acquire0(promise);
+            } else {
+                executor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        acquire0(promise);
+                    }
+                });
+            }
+        } catch (Throwable cause) {
+            promise.setFailure(cause);
+        }
+        return promise;
+    }
+
+    private void acquire0(final Promise<Channel> promise) {
+        assert executor.inEventLoop();
+
+        if (closed) {
+            promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
+            return;
+        }
+        if (acquiredChannelCount.get() < maxConnections) {
+            assert acquiredChannelCount.get() >= 0;
+
+            // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
+            // EventLoop
+            Promise<Channel> p = executor.newPromise();
+            TinkerpopFixedChannelPool.AcquireListener l = new TinkerpopFixedChannelPool.AcquireListener(promise);
+            l.acquired();
+            p.addListener(l);
+            super.acquire(p);
+        } else {
+            if (pendingAcquireCount >= maxPendingAcquires) {
+                promise.setFailure(FULL_EXCEPTION);
+            } else {
+                TinkerpopFixedChannelPool.AcquireTask task = new TinkerpopFixedChannelPool.AcquireTask(promise);
+                if (pendingAcquireQueue.offer(task)) {
+                    ++pendingAcquireCount;
+
+                    if (timeoutTask != null) {
+                        task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
+                    }
+                } else {
+                    promise.setFailure(FULL_EXCEPTION);
+                }
+            }
+
+            assert pendingAcquireCount > 0;
+        }
+    }
+
+    @Override
+    public Future<Void> release(final Channel channel, final Promise<Void> promise) {
+        ObjectUtil.checkNotNull(promise, "promise");
+        final Promise<Void> p = executor.newPromise();
+        super.release(channel, p.addListener(new FutureListener<Void>() {
+
+            @Override
+            public void operationComplete(Future<Void> future) throws Exception {
+                assert executor.inEventLoop();
+
+                if (closed) {
+                    // Since the pool is closed, we have no choice but to close the channel
+                    channel.close();
+                    promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION);
+                    return;
+                }
+
+                if (future.isSuccess()) {
+                    decrementAndRunTaskQueue();
+                    promise.setSuccess(null);
+                } else {
+                    Throwable cause = future.cause();
+                    // Check if the exception was not because of we passed the Channel to the wrong pool.
+                    if (!(cause instanceof IllegalArgumentException)) {
+                        decrementAndRunTaskQueue();
+                    }
+                    promise.setFailure(future.cause());
+                }
+            }
+        }));
+        return promise;
+    }
+
+    private void decrementAndRunTaskQueue() {
+        // We should never have a negative value.
+        int currentCount = acquiredChannelCount.decrementAndGet();
+        assert currentCount >= 0;
+
+        // Run the pending acquire tasks before notify the original promise so if the user would
+        // try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >=
+        // maxPendingAcquires we may be able to run some pending tasks first and so allow to add
+        // more.
+        runTaskQueue();
+    }
+
+    private void runTaskQueue() {
+        while (acquiredChannelCount.get() < maxConnections) {
+            TinkerpopFixedChannelPool.AcquireTask task = pendingAcquireQueue.poll();
+            if (task == null) {
+                break;
+            }
+
+            // Cancel the timeout if one was scheduled
+            ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
+            if (timeoutFuture != null) {
+                timeoutFuture.cancel(false);
+            }
+
+            --pendingAcquireCount;
+            task.acquired();
+
+            super.acquire(task.promise);
+        }
+
+        // We should never have a negative value.
+        assert pendingAcquireCount >= 0;
+        assert acquiredChannelCount.get() >= 0;
+    }
+
+    // AcquireTask extends AcquireListener to reduce object creations and so GC pressure
+    private final class AcquireTask extends TinkerpopFixedChannelPool.AcquireListener {
+        final Promise<Channel> promise;
+        final long expireNanoTime = System.nanoTime() + acquireTimeoutNanos;
+        ScheduledFuture<?> timeoutFuture;
+
+        public AcquireTask(Promise<Channel> promise) {
+            super(promise);
+            // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
+            // EventLoop.
+            this.promise = executor.<Channel>newPromise().addListener(this);
+        }
+    }
+
+    private abstract class TimeoutTask implements Runnable {
+        @Override
+        public final void run() {
+            assert executor.inEventLoop();
+            long nanoTime = System.nanoTime();
+            for (;;) {
+                TinkerpopFixedChannelPool.AcquireTask task = pendingAcquireQueue.peek();
+                // Compare nanoTime as descripted in the javadocs of System.nanoTime()
+                //
+                // See https://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime()
+                // See https://github.com/netty/netty/issues/3705
+                if (task == null || nanoTime - task.expireNanoTime < 0) {
+                    break;
+                }
+                pendingAcquireQueue.remove();
+
+                --pendingAcquireCount;
+                onTimeout(task);
+            }
+        }
+
+        public abstract void onTimeout(TinkerpopFixedChannelPool.AcquireTask task);
+    }
+
+    private class AcquireListener implements FutureListener<Channel> {
+        private final Promise<Channel> originalPromise;
+        protected boolean acquired;
+
+        AcquireListener(Promise<Channel> originalPromise) {
+            this.originalPromise = originalPromise;
+        }
+
+        @Override
+        public void operationComplete(Future<Channel> future) throws Exception {
+            assert executor.inEventLoop();
+
+            if (closed) {
+                if (future.isSuccess()) {
+                    // Since the pool is closed, we have no choice but to close the channel
+                    future.getNow().close();
+                }
+                originalPromise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
+                return;
+            }
+
+            if (future.isSuccess()) {
+                originalPromise.setSuccess(future.getNow());
+            } else {
+                if (acquired) {
+                    decrementAndRunTaskQueue();
+                } else {
+                    runTaskQueue();
+                }
+
+                originalPromise.setFailure(future.cause());
+            }
+        }
+
+        public void acquired() {
+            if (acquired) {
+                return;
+            }
+            acquiredChannelCount.incrementAndGet();
+            acquired = true;
+        }
+    }
+
+    /**
+     * Closes the pool in an async manner.
+     *
+     * @return Future which represents completion of the close task
+     */
+    public Future<Void> closeAsync() {
+        if (executor.inEventLoop()) {
+            return close0();
+        } else {
+            final Promise<Void> closeComplete = executor.newPromise();
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    close0().addListener(new FutureListener<Void>() {
+                        @Override
+                        public void operationComplete(Future<Void> f) throws Exception {
+                            if (f.isSuccess()) {
+                                closeComplete.setSuccess(null);
+                            } else {
+                                closeComplete.setFailure(f.cause());
+                            }
+                        }
+                    });
+                }
+            });
+            return closeComplete;
+        }
+    }
+
+    private Future<Void> close0() {
+        assert executor.inEventLoop();
+
+        if (!closed) {
+            closed = true;
+            for (;;) {
+                TinkerpopFixedChannelPool.AcquireTask task = pendingAcquireQueue.poll();
+                if (task == null) {
+                    break;
+                }
+                ScheduledFuture<?> f = task.timeoutFuture;
+                if (f != null) {
+                    f.cancel(false);
+                }
+                task.promise.setFailure(new ClosedChannelException());
+            }
+            acquiredChannelCount.set(0);
+            pendingAcquireCount = 0;
+
+            // Ensure we dispatch this on another Thread as close0 will be called from the EventExecutor and we need
+            // to ensure we will not block in a EventExecutor.
+            return GlobalEventExecutor.INSTANCE.submit(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    TinkerpopFixedChannelPool.super.close();
+                    return null;
+                }
+            });
+        }
+
+        return GlobalEventExecutor.INSTANCE.newSucceededFuture(null);
+    }
+}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 4f43afe..1d6d92f 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -58,7 +58,12 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
 
     @Override
     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
-        handshaker.handshake(ctx.channel());
+        handshaker.handshake(ctx.channel()).addListener(f -> {
+                if (!f.isSuccess()) {
+                    if (!handshakeFuture.isDone()) handshakeFuture.setFailure(f.cause());
+                    ctx.fireExceptionCaught(f.cause());
+                }
+        });
     }
 
     @Override
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketIdleEventHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketIdleEventHandler.java
new file mode 100644
index 0000000..2fb6df3
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketIdleEventHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ChannelHandler.Sharable
+public class WebSocketIdleEventHandler extends ChannelDuplexHandler {
+    private static final Logger logger = LoggerFactory.getLogger(WebSocketIdleEventHandler.class);
+    private final ChannelGroup activeChannels;
+
+    public WebSocketIdleEventHandler(final ChannelGroup activeChannels) {
+        this.activeChannels = activeChannels;
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        activeChannels.add(ctx.channel());
+        super.channelActive(ctx);
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
+        if (event instanceof IdleStateEvent) {
+            IdleStateEvent e = (IdleStateEvent) event;
+            if (e.state() == IdleState.READER_IDLE) {
+                logger.warn("Server " + ctx.channel() + " has been idle for too long.");
+            } else if (e.state() == IdleState.WRITER_IDLE || e.state() == IdleState.ALL_IDLE) {
+                logger.info("Sending ping frame to the server");
+                ctx.writeAndFlush(new PingWebSocketFrame());
+            }
+        }
+    }
+}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java
new file mode 100644
index 0000000..eb16c60
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.util.AttributeKey;
+
+/**
+ * This handler ensures that WebSocket connections are closed gracefully on the remote server
+ * on close of a {@link Channel} by sending a CloseWebSocketFrame to the server.
+ * <p>
+ * This handler is also idempotent and sends out the CloseFrame only once.
+ */
+public class WebsocketCloseHandler extends ChannelOutboundHandlerAdapter {
+    private static final AttributeKey<Boolean> CLOSE_WS_SENT = AttributeKey.newInstance("closeWebSocketSent");
+
+    @Override
+    public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) {
+        if (ctx.channel().isActive() && ctx.channel().attr(CLOSE_WS_SENT).compareAndSet(Boolean.FALSE, Boolean.TRUE)) {
+            ctx.channel().writeAndFlush(new CloseWebSocketFrame(1000, "Client is closing the channel."), promise)
+               .addListener(ChannelFutureListener.CLOSE);
+            ctx.pipeline().remove(this);
+        }
+    }
+
+
+
+    @Override
+    public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
+        ctx.channel().attr(CLOSE_WS_SENT).setIfAbsent(Boolean.FALSE);
+        super.handlerAdded(ctx);
+    }
+}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
index 4fb950c..93fe727 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
@@ -42,6 +42,7 @@ public abstract class AbstractClient implements SimpleClient {
 
     public AbstractClient(final String threadPattern) {
         final BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern(threadPattern).build();
+        // TODO: Use Epoll if available
         group = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), threadFactory);
     }
 
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClusterBuilderTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClusterBuilderTest.java
index cb5c469..81290a9 100644
--- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClusterBuilderTest.java
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClusterBuilderTest.java
@@ -38,19 +38,10 @@ public class ClusterBuilderTest {
     @Parameterized.Parameters(name = "{0}")
     public static Iterable<Object[]> data() {
         return Arrays.asList(new Object[][]{
-                {"maxInProcessPerConnection0", Cluster.build().maxInProcessPerConnection(0), "maxInProcessPerConnection must be greater than zero"},
-                {"maxInProcessPerConnectionNeg1", Cluster.build().maxInProcessPerConnection(-1), "maxInProcessPerConnection must be greater than zero"},
-                {"minInProcessPerConnectionNeg1", Cluster.build().minInProcessPerConnection(-1), "minInProcessPerConnection must be greater than or equal to zero"},
-                {"minInProcessPerConnectionLtMax", Cluster.build().minInProcessPerConnection(100).maxInProcessPerConnection(99), "maxInProcessPerConnection cannot be less than minInProcessPerConnection"},
-                {"maxSimultaneousUsagePerConnection0", Cluster.build().maxSimultaneousUsagePerConnection(0), "maxSimultaneousUsagePerConnection must be greater than zero"},
-                {"maxSimultaneousUsagePerConnectionNeg1", Cluster.build().maxSimultaneousUsagePerConnection(-1), "maxSimultaneousUsagePerConnection must be greater than zero"},
-                {"minSimultaneousUsagePerConnectionNeg1", Cluster.build().minSimultaneousUsagePerConnection(-1), "minSimultaneousUsagePerConnection must be greater than or equal to zero"},
-                {"minSimultaneousUsagePerConnectionLtMax", Cluster.build().minSimultaneousUsagePerConnection(100).maxSimultaneousUsagePerConnection(99), "maxSimultaneousUsagePerConnection cannot be less than minSimultaneousUsagePerConnection"},
+                {"maxInProcessPerConnectionNeg1", Cluster.build().maxInProcessPerConnection(-1), "maxInProcessPerConnection must be greater than equal to zero"},
+                {"maxSimultaneousUsagePerConnectionNeg1", Cluster.build().maxSimultaneousUsagePerConnection(-1), "maxSimultaneousUsagePerConnection must be greater than equal to zero"},
                 {"maxConnectionPoolSize0", Cluster.build().maxConnectionPoolSize(0), "maxConnectionPoolSize must be greater than zero"},
                 {"maxConnectionPoolSizeNeg1", Cluster.build().maxConnectionPoolSize(-1), "maxConnectionPoolSize must be greater than zero"},
-                {"minConnectionPoolSizeNeg1", Cluster.build().minConnectionPoolSize(-1), "minConnectionPoolSize must be greater than or equal to zero"},
-                {"minConnectionPoolSizeLteMax", Cluster.build().minConnectionPoolSize(100).maxConnectionPoolSize(99), "maxConnectionPoolSize cannot be less than minConnectionPoolSize"},
-                {"minConnectionPoolSizeLteMax", Cluster.build().minConnectionPoolSize(100).maxConnectionPoolSize(99), "maxConnectionPoolSize cannot be less than minConnectionPoolSize"},
                 {"maxConnectionPoolSize0", Cluster.build().maxWaitForConnection(0), "maxWaitForConnection must be greater than zero"},
                 {"maxWaitForSessionClose0", Cluster.build().maxWaitForSessionClose(0), "maxWaitForSessionClose must be greater than zero"},
                 {"maxWaitForSessionCloseNeg1", Cluster.build().maxWaitForSessionClose(-1), "maxWaitForSessionClose must be greater than zero"},
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
deleted file mode 100644
index 91d14af..0000000
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.driver;
-
-import io.netty.handler.codec.CorruptedFrameException;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
-import org.apache.tinkerpop.gremlin.server.AbstractGremlinServerIntegrationTest;
-import org.apache.tinkerpop.gremlin.server.TestClientFactory;
-import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrationTest {
-    private Log4jRecordingAppender recordingAppender = null;
-    private Level previousLogLevel;
-
-    @Before
-    public void setupForEachTest() {
-        recordingAppender = new Log4jRecordingAppender();
-        final Logger rootLogger = Logger.getRootLogger();
-
-        if (name.getMethodName().equals("shouldCloseConnectionDeadDueToUnRecoverableError")) {
-            final org.apache.log4j.Logger connectionLogger = org.apache.log4j.Logger.getLogger(Connection.class);
-            previousLogLevel = connectionLogger.getLevel();
-            connectionLogger.setLevel(Level.DEBUG);
-        }
-
-        rootLogger.addAppender(recordingAppender);
-    }
-
-    @After
-    public void teardownForEachTest() {
-        final Logger rootLogger = Logger.getRootLogger();
-
-        if (name.getMethodName().equals("shouldCloseConnectionDeadDueToUnRecoverableError")) {
-            final org.apache.log4j.Logger connectionLogger = org.apache.log4j.Logger.getLogger(Connection.class);
-            connectionLogger.setLevel(previousLogLevel);
-        }
-
-        rootLogger.removeAppender(recordingAppender);
-    }
-
-    /**
-     * Reproducer for TINKERPOP-2169
-     */
-    @Test
-    public void shouldCloseConnectionDeadDueToUnRecoverableError() throws Exception {
-        // Set a low value of maxContentLength to intentionally trigger CorruptedFrameException
-        final Cluster cluster = TestClientFactory.build()
-                                                 .serializer(Serializers.GRYO_V3D0)
-                                                 .maxContentLength(64)
-                                                 .minConnectionPoolSize(1)
-                                                 .maxConnectionPoolSize(2)
-                                                 .create();
-        final Client.ClusteredClient client = cluster.connect();
-
-        try {
-            // Add the test data so that the g.V() response could exceed maxContentLength
-            client.submit("g.inject(1).repeat(__.addV()).times(10).count()").all().get();
-            try {
-                client.submit("g.V().fold()").all().get();
-
-                fail("Should throw an exception.");
-            } catch (Exception re) {
-                assertThat(re.getCause() instanceof CorruptedFrameException, is(true));
-            }
-
-            // Assert that the host has not been marked unavailable
-            assertEquals(1, cluster.availableHosts().size());
-
-            // Assert that there is no connection leak and all connections have been closed
-            assertEquals(0, client.hostConnectionPools.values().stream()
-                                                             .findFirst().get()
-                                                             .numConnectionsWaitingToCleanup());
-        } finally {
-            cluster.close();
-        }
-
-        // Assert that the connection has been destroyed. Specifically check for the string with
-        // isDead=true indicating the connection that was closed due to CorruptedFrameException.
-        assertThat(recordingAppender.logContainsAny("^(?!.*(isDead=false)).*isDead=true.*destroyed successfully.$"), is(true));
-
-    }
-}
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientSingleRequestConnectionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientSingleRequestConnectionIntegrateTest.java
new file mode 100644
index 0000000..dfbdb76
--- /dev/null
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientSingleRequestConnectionIntegrateTest.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import io.netty.handler.codec.CorruptedFrameException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.server.AbstractGremlinServerIntegrationTest;
+import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.TestClientFactory;
+import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ClientSingleRequestConnectionIntegrateTest extends AbstractGremlinServerIntegrationTest {
+
+    private Log4jRecordingAppender recordingAppender = null;
+    private Level previousLogLevel;
+
+    @Before
+    public void setupForEachTest() {
+        recordingAppender = new Log4jRecordingAppender();
+        final Logger rootLogger = Logger.getRootLogger();
+
+        if (name.getMethodName().equals("shouldRecoverFromConnectionCloseDueToUnRecoverableError")) {
+            final org.apache.log4j.Logger connectionLogger = org.apache.log4j.Logger.getLogger(SingleRequestConnection.class);
+            previousLogLevel = connectionLogger.getLevel();
+            connectionLogger.setLevel(Level.DEBUG);
+        }
+
+        if (name.getMethodName().equals("testConnectionReleaseOnResultSetClose")) {
+            final org.apache.log4j.Logger connectionLogger = org.apache.log4j.Logger.getLogger(SingleRequestConnection.class);
+            previousLogLevel = connectionLogger.getLevel();
+            connectionLogger.setLevel(Level.DEBUG);
+        }
+
+        if (name.getMethodName().equals("testGracefulClose") || name.getMethodName().equals("testAbruptClose")) {
+            final org.apache.log4j.Logger connectionPoolLogger = org.apache.log4j.Logger.getLogger(ConnectionPoolImpl.class);
+            previousLogLevel = connectionPoolLogger.getLevel();
+            connectionPoolLogger.setLevel(Level.INFO);
+        }
+
+        rootLogger.addAppender(recordingAppender);
+    }
+
+    @After
+    public void teardownForEachTest() {
+        final Logger rootLogger = Logger.getRootLogger();
+
+        if (name.getMethodName().equals("shouldRecoverFromConnectionCloseDueToUnRecoverableError")) {
+            final org.apache.log4j.Logger connectionLogger = org.apache.log4j.Logger.getLogger(SingleRequestConnection.class);
+            connectionLogger.setLevel(previousLogLevel);
+        }
+
+        if (name.getMethodName().equals("testConnectionReleaseOnResultSetClose")) {
+            final org.apache.log4j.Logger connectionLogger = org.apache.log4j.Logger.getLogger(SingleRequestConnection.class);
+            connectionLogger.setLevel(previousLogLevel);
+        }
+
+        if (name.getMethodName().equals("testGracefulClose") || name.getMethodName().equals("testAbruptClose")) {
+            final org.apache.log4j.Logger connectionLogger = org.apache.log4j.Logger.getLogger(ConnectionPoolImpl.class);
+            connectionLogger.setLevel(previousLogLevel);
+        }
+
+        rootLogger.removeAppender(recordingAppender);
+    }
+
+    /**
+     * Configure specific Gremlin Server settings for specific tests.
+     */
+    @Override
+    public Settings overrideSettings(final Settings settings) {
+        final String nameOfTest = name.getMethodName();
+        switch (nameOfTest) {
+            case "testClientCloseInMiddleOfResultConsumption":
+                settings.writeBufferHighWaterMark = 32;
+                settings.writeBufferLowWaterMark = 16;
+                break;
+        }
+        return settings;
+    }
+
+    /**
+     * Netty would close the channel on an un-recoverable exception such as CorruptedFrameException.
+     * This test validates the correct replacement of the channel.
+     */
+    @Test
+    public void shouldRecoverFromConnectionCloseDueToUnRecoverableError() throws Exception {
+        // Set a low value of maxContentLength to intentionally trigger CorruptedFrameException
+        final Cluster cluster = TestClientFactory.build()
+                                                 .maxContentLength(64)
+                                                 .maxConnectionPoolSize(1)
+                                                 .maxSimultaneousUsagePerConnection(0)
+                                                 .minSimultaneousUsagePerConnection(0)
+                                                 .maxInProcessPerConnection(0)
+                                                 .minInProcessPerConnection(0)
+                                                 .create();
+        final Client.ClusteredClient client = cluster.connect();
+
+        try {
+            // Add the test data so that the g.V() response could exceed maxContentLength
+            client.submit("g.inject(1).repeat(__.addV()).times(10).count()").all().get();
+
+            try {
+                client.submit("g.V().fold()").all().get();
+                fail("Should throw an exception.");
+            } catch (Exception re) {
+                assertThat(re.getCause() instanceof CorruptedFrameException, is(true));
+            }
+
+            Thread.sleep(2000);
+
+            // Assert that the host has not been marked unavailable
+            assertEquals(1, cluster.availableHosts().size());
+
+            assertThat("Unable to find statement in the log.",
+                       this.recordingAppender.logContainsAny(".*Error while reading the response from the server.*"), is(true));
+
+            // Assert that we are able to send requests from the pool (it should have replaced the connection)
+            client.submit("g.V().limit(1).id()").all().get();
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void testReuseSameConnection() throws Exception {
+        final Cluster cluster = this.createClusterWithXNumOfConnection(1);
+        final Client.ClusteredClient client = cluster.connect();
+
+        try {
+            ResultSet resultSet = client.submit("g.V().limit(1)");
+            final String channelIdForFirstRequest = resultSet.getChannelId();
+
+            Assert.assertTrue(StringUtils.isNotBlank(channelIdForFirstRequest));
+            resultSet.all().get();
+
+            resultSet = client.submit("g.V().limit(1)");
+
+            final String channelIdForSecondRequest = resultSet.getChannelId();
+
+            Assert.assertTrue(StringUtils.isNotBlank(channelIdForSecondRequest));
+            resultSet.all().get();
+
+            Assert.assertEquals(channelIdForFirstRequest, channelIdForSecondRequest);
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void testTimeoutOnExpiredMaxWaitForConnection() {
+        final Cluster cluster = TestClientFactory.build()
+                                                 .maxConnectionPoolSize(2)
+                                                 .maxSimultaneousUsagePerConnection(0)
+                                                 .minSimultaneousUsagePerConnection(0)
+                                                 .maxInProcessPerConnection(0)
+                                                 .minInProcessPerConnection(0)
+                                                 .maxWaitForConnection(500)
+                                                 .create();
+
+        final Client.ClusteredClient client = cluster.connect();
+
+        try {
+            CompletableFuture<ResultSet> rs1 = client.submitAsync("Thread.sleep(3000);'done'");
+            CompletableFuture<ResultSet> rs2 = client.submitAsync("Thread.sleep(3000);'done'");
+
+            CompletableFuture.allOf(rs1, rs2).join();
+
+            client.submit("g.V().limit(1)");
+
+            fail("Should throw exception");
+        } catch (Exception ex) {
+            assertThat("Should throw timeout exception on max wait expired",
+                       ex.getCause().getCause() instanceof TimeoutException, is(true));
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldFailAttemptToSendRequestToConnectionInUse() {
+        final Cluster cluster = this.createClusterWithXNumOfConnection(1);
+        final Client.ClusteredClient client = cluster.connect();
+        try {
+            // Add some test data
+            client.submit("g.inject(1).repeat(__.addV()).times(12).count()").all().get();
+
+            final String query = "g.V()";
+            final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
+                                                         .addArg(Tokens.ARGS_GREMLIN, query).create();
+            final Connection conn = client.chooseConnectionAsync(request).get();
+
+            final CompletableFuture<ResultSet> resultFuture = new CompletableFuture<>();
+            conn.write(request, resultFuture).get();
+
+            // Second request to the same connection should fail while the first is in progress
+            conn.write(request, resultFuture);
+
+            fail("Should throw exception");
+        } catch (Exception ex) {
+            assertThat("Should throw exception on trying to send another request on busy connection",
+                       ex instanceof IllegalStateException, is(true));
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void testClientWriteOnServerUnreachable() throws Exception {
+        final Cluster cluster = this.createClusterWithXNumOfConnection(1);
+        final Client.ClusteredClient client = cluster.connect();
+        try {
+            final int resultCountToGenerate = 1000;
+            final String fatty = IntStream.range(0, 175).mapToObj(String::valueOf).collect(Collectors.joining());
+            final String fattyX = "['" + fatty + "'] * " + resultCountToGenerate;
+
+            final int batchSize = 2;
+            final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
+                                                         .addArg(Tokens.ARGS_BATCH_SIZE, batchSize)
+                                                         .addArg(Tokens.ARGS_GREMLIN, fattyX).create();
+
+            client.init();
+            final Connection conn = client.chooseConnectionAsync(request).get();
+
+            // stop the server to mimic a situation where server goes down before establishing a connection
+            this.stopServer();
+
+            final CompletableFuture<ResultSet> resultFuture = new CompletableFuture<>();
+
+            try {
+                conn.write(request, resultFuture);
+                resultFuture.join();
+                fail("Should throw exception.");
+            } catch (Exception ex) {
+                assertThat("Should throw ConnectException on unreachable server",
+                           ex.getCause() instanceof ConnectException, is(true));
+            }
+
+            this.startServer();
+
+            // Verify that client has recovered from this error. Ideally the server should also have released
+            // the resources associated with this request but this test doesn't verify that.
+            client.submit("g.V().limit(1)").all().get();
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void testClientCloseInMiddleOfResultConsumption() throws Exception {
+        final Cluster cluster = this.createClusterWithXNumOfConnection(1);
+        final Client.ClusteredClient client = cluster.connect();
+        try {
+            final int resultCountToGenerate = 1000;
+            final String fatty = IntStream.range(0, 175).mapToObj(String::valueOf).collect(Collectors.joining());
+            final String fattyX = "['" + fatty + "'] * " + resultCountToGenerate;
+
+            final int batchSize = 2;
+            final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
+                                                         .addArg(Tokens.ARGS_BATCH_SIZE, batchSize)
+                                                         .addArg(Tokens.ARGS_GREMLIN, fattyX).create();
+
+            client.init();
+            Connection conn = client.chooseConnectionAsync(request).get();
+            final CompletableFuture<ResultSet> resultFuture = new CompletableFuture<>();
+            conn.write(request, resultFuture).syncUninterruptibly().getNow();
+            final ResultSet rs1 = resultFuture.get();
+            final Result res = rs1.iterator().next();
+            final String channelIdForFirstRequest = rs1.getChannelId();
+
+            Assert.assertTrue(StringUtils.isNotBlank(channelIdForFirstRequest));
+
+            // verify that the server is giving out some results
+            Assert.assertEquals(fatty, res.getString());
+
+            try {
+                // return the connection to the pool while all results have not been read
+                ((SingleRequestConnection) conn).releaseResources().get();
+                resultFuture.get().one();
+                fail("Should throw exception.");
+            } catch (Exception ex) {
+                assertThat("Should throw IllegalStateException on reading results from a connection that has been returned to the pool",
+                           ex.getCause() instanceof IllegalStateException, is(true));
+            }
+
+            // Verify that client has recovered from this abrupt close. Ideally the server should also have released
+            // the resources associated with this request but this test doesn't verify that.
+            RequestMessage request2 = RequestMessage.build(Tokens.OPS_EVAL)
+                                                    .addArg(Tokens.ARGS_BATCH_SIZE, batchSize)
+                                                    .addArg(Tokens.ARGS_GREMLIN, "g.V().limit(1)").create();
+            conn = client.chooseConnectionAsync(request2).get();
+            final CompletableFuture<ResultSet> resultFuture2 = new CompletableFuture<>();
+            conn.write(request2, resultFuture2).syncUninterruptibly().getNow();
+
+            ResultSet rs2 = resultFuture2.get();
+            // Verify that the same channel is re-used again
+            final String channelIdForSecondRequest = rs2.getChannelId();
+            Assert.assertTrue(StringUtils.isNotBlank(channelIdForSecondRequest));
+
+            rs2.all().get();
+
+            Assert.assertEquals(channelIdForFirstRequest, channelIdForSecondRequest);
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldPropagateLegitResponseExceptionFromServer() throws Exception {
+        final Cluster cluster = this.createClusterWithXNumOfConnection(1);
+        final Client.ClusteredClient client = cluster.connect();
+
+        try {
+            ResultSet resultSet = client.submit("g.V().X()");
+            final String channelIdForFirstRequest = resultSet.getChannelId();
+
+            Assert.assertTrue(StringUtils.isNotBlank(channelIdForFirstRequest));
+
+            try {
+                resultSet.all().get();
+            } catch (Exception ex) {
+                assertThat("Should throw ResponseException on genuine server errors.",
+                           ex.getCause() instanceof ResponseException, is(true));
+            }
+
+            resultSet = client.submit("g.V().limit(1)");
+
+            final String channelIdForSecondRequest = resultSet.getChannelId();
+
+            Assert.assertTrue(StringUtils.isNotBlank(channelIdForSecondRequest));
+            resultSet.all().get();
+
+            Assert.assertEquals(channelIdForFirstRequest, channelIdForSecondRequest);
+
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void testGracefulClose() throws ExecutionException, InterruptedException, TimeoutException {
+        // For this test to succeed ensure that server query timeout >
+        // (number of requests/number of executor thread in server)*wait per request
+        final Cluster cluster = this.createClusterWithXNumOfConnection(250);
+
+        try {
+            final Client.ClusteredClient client = cluster.connect();
+
+            final int requests = 250;
+            final List<CompletableFuture<ResultSet>> futures = new ArrayList<>(requests);
+            IntStream.range(0, requests).forEach(i -> {
+                try {
+                    futures.add(client.submitAsync("Thread.sleep(100);"));
+                } catch (Exception ex) {
+                    throw new RuntimeException(ex);
+                }
+            });
+
+            assertEquals(requests, futures.size());
+
+            int counter = 0;
+            for (CompletableFuture<ResultSet> f : futures) {
+                f.get().all().get(30000, TimeUnit.MILLISECONDS);
+                counter++;
+            }
+
+            assertEquals(requests, counter);
+        } finally {
+            cluster.close();
+        }
+
+        assertThat(recordingAppender.getMessages(), hasItem("INFO - Closed ConnectionPool{closing=true, host=Host{address=localhost/127.0.0.1:45940, hostUri=ws://localhost:45940/gremlin}, BusyConnectionCount=0}\n"));
+        // No errors or warnings should be printed
+        assertThat(recordingAppender.getMessages(), not(hasItem("ERROR - .*")));
+    }
+
+    @Test
+    public void testAbruptClose() throws ExecutionException, InterruptedException, TimeoutException {
+        final Cluster cluster = this.createClusterWithXNumOfConnection(50);
+
+
+        final Client.ClusteredClient client = cluster.connect();
+
+        final int requests = 50;
+        IntStream.range(0, requests).forEach(i -> {
+            try {
+                client.submitAsync("Thread.sleep(1000);");
+            } catch (Exception ex) {
+                throw new RuntimeException(ex);
+            }
+        });
+
+        // Wait for the requests to be sent to the server
+        Thread.sleep(2000);
+
+        // Close the cluster abruptly while the requests are in flight
+        cluster.close();
+
+        assertThat(recordingAppender.getMessages(), hasItem("INFO - Closing active channels borrowed from ChannelPool [BusyConnectionCount=50]\n"));
+        assertThat(recordingAppender.getMessages(), hasItem("INFO - Closed ConnectionPool{closing=true, host=Host{address=localhost/127.0.0.1:45940, hostUri=ws://localhost:45940/gremlin}, BusyConnectionCount=0}\n"));
+    }
+
+    private Cluster createClusterWithXNumOfConnection(int x) {
+        return TestClientFactory.build()
+                                .maxConnectionPoolSize(x)
+                                .maxSimultaneousUsagePerConnection(0)
+                                .minSimultaneousUsagePerConnection(0)
+                                .maxInProcessPerConnection(0)
+                                .minInProcessPerConnection(0)
+                                .create();
+    }
+}
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 8226fc3..9f92d66 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.server;
 
+import groovy.json.JsonBuilder;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.log4j.Level;
 import org.apache.tinkerpop.gremlin.TestHelper;
@@ -34,9 +35,9 @@ import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
 import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
 import org.apache.tinkerpop.gremlin.driver.ser.GraphBinaryMessageSerializerV1;
+import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0;
 import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0;
 import org.apache.tinkerpop.gremlin.driver.ser.JsonBuilderGryoSerializer;
-import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0;
 import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
 import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin;
 import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource;
@@ -49,7 +50,6 @@ import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertex;
 import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;
 import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
 import org.apache.tinkerpop.gremlin.util.TimeUtil;
-import groovy.json.JsonBuilder;
 import org.apache.tinkerpop.gremlin.util.function.FunctionUtils;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.hamcrest.core.IsInstanceOf;
@@ -63,6 +63,7 @@ import org.slf4j.LoggerFactory;
 
 import java.awt.Color;
 import java.io.File;
+import java.net.ConnectException;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -77,7 +78,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -86,9 +86,11 @@ import java.util.stream.IntStream;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.endsWith;
 import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.hamcrest.core.AllOf.allOf;
+import static org.hamcrest.core.StringStartsWith.startsWith;
 import static org.hamcrest.number.OrderingComparison.greaterThan;
 import static org.hamcrest.number.OrderingComparison.lessThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
@@ -257,9 +259,10 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         // keep the connection pool size at 1 to remove the possibility of lots of connections trying to ping which will
         // complicate the assertion logic
         final Cluster cluster = TestClientFactory.build().
-                minConnectionPoolSize(1).
                 maxConnectionPoolSize(1).
-                keepAliveInterval(1000).create();
+                maxSimultaneousUsagePerConnection(0).
+                maxInProcessPerConnection(0).
+                keepAliveInterval(1002).create();
         final Client client = cluster.connect();
 
         // fire up lots of requests so as to schedule/deschedule lots of ping jobs
@@ -335,7 +338,7 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
             fail("Should not have gone through because the server is not running");
         } catch (Exception i) {
             final Throwable root = ExceptionUtils.getRootCause(i);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(ConnectException.class));
         }
 
         startServer();
@@ -369,7 +372,7 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
             fail("Should not have gone through because the server is not running");
         } catch (Exception i) {
             final Throwable root = ExceptionUtils.getRootCause(i);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(ConnectException.class));
         }
 
         startServer();
@@ -500,59 +503,67 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
     @Test
     public void shouldProcessRequestsOutOfOrder() throws Exception {
         final Cluster cluster = TestClientFactory.open();
-        final Client client = cluster.connect();
+        try {
+            final Client client = cluster.connect();
 
-        final ResultSet rsFive = client.submit("Thread.sleep(5000);'five'");
-        final ResultSet rsZero = client.submit("'zero'");
+            final ResultSet rsFive = client.submit("Thread.sleep(5000);'five'");
+            final ResultSet rsZero = client.submit("'zero'");
 
-        final CompletableFuture<List<Result>> futureFive = rsFive.all();
-        final CompletableFuture<List<Result>> futureZero = rsZero.all();
+            final CompletableFuture<List<Result>> futureFive = rsFive.all();
+            final CompletableFuture<List<Result>> futureZero = rsZero.all();
 
-        final long start = System.nanoTime();
-        assertFalse(futureFive.isDone());
-        assertEquals("zero", futureZero.get().get(0).getString());
+            final long start = System.nanoTime();
+            assertFalse(futureFive.isDone());
+            assertEquals("zero", futureZero.get().get(0).getString());
 
-        logger.info("Eval of 'zero' complete: " + TimeUtil.millisSince(start));
+            logger.info("Eval of 'zero' complete: " + TimeUtil.millisSince(start));
 
-        assertFalse(futureFive.isDone());
-        assertEquals("five", futureFive.get(10, TimeUnit.SECONDS).get(0).getString());
+            assertFalse(futureFive.isDone());
+            assertEquals("five", futureFive.get(10, TimeUnit.SECONDS).get(0).getString());
 
-        logger.info("Eval of 'five' complete: " + TimeUtil.millisSince(start));
+            logger.info("Eval of 'five' complete: " + TimeUtil.millisSince(start));
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
     public void shouldProcessSessionRequestsInOrder() throws Exception {
         final Cluster cluster = TestClientFactory.open();
-        final Client client = cluster.connect(name.getMethodName());
+        try {
+            final Client client = cluster.connect(name.getMethodName());
 
-        final ResultSet rsFive = client.submit("Thread.sleep(5000);'five'");
-        final ResultSet rsZero = client.submit("'zero'");
+            final ResultSet rsFive = client.submit("Thread.sleep(5000);'five'");
+            final ResultSet rsZero = client.submit("'zero'");
 
-        final CompletableFuture<List<Result>> futureFive = rsFive.all();
-        final CompletableFuture<List<Result>> futureZero = rsZero.all();
+            final CompletableFuture<List<Result>> futureFive = rsFive.all();
+            final CompletableFuture<List<Result>> futureZero = rsZero.all();
 
-        final CountDownLatch latch = new CountDownLatch(2);
-        final List<String> order = new ArrayList<>();
-        final ExecutorService executor = Executors.newSingleThreadExecutor();
+            final CountDownLatch latch = new CountDownLatch(2);
+            final List<String> order = new ArrayList<>();
+            final ExecutorService executor = Executors.newSingleThreadExecutor();
 
-        futureFive.thenAcceptAsync(r -> {
-            order.add(r.get(0).getString());
-            latch.countDown();
-        }, executor);
+            futureFive.thenAcceptAsync(r -> {
+                order.add(r.get(0).getString());
+                latch.countDown();
+            }, executor);
 
-        futureZero.thenAcceptAsync(r -> {
-            order.add(r.get(0).getString());
-            latch.countDown();
-        }, executor);
+            futureZero.thenAcceptAsync(r -> {
+                order.add(r.get(0).getString());
+                latch.countDown();
+            }, executor);
 
-        // wait for both results
-        latch.await(30000, TimeUnit.MILLISECONDS);
+            // wait for both results
+            latch.await(30000, TimeUnit.MILLISECONDS);
 
-        // should be two results
-        assertEquals(2, order.size());
+            // should be two results
+            assertEquals(2, order.size());
 
-        // ensure that "five" is first then "zero"
-        assertThat(order, contains("five", "zero"));
+            // ensure that "five" is first then "zero"
+            assertThat(order, contains("five", "zero"));
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -741,21 +752,6 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
     }
 
     @Test
-    public void shouldMarkHostDeadSinceServerIsDown() throws Exception {
-        final Cluster cluster = TestClientFactory.open();
-        assertEquals(0, cluster.availableHosts().size());
-        cluster.connect().init();
-        assertEquals(1, cluster.availableHosts().size());
-
-        stopServer();
-
-        cluster.connect().init();
-        assertEquals(0, cluster.availableHosts().size());
-
-        cluster.close();
-    }
-
-    @Test
     public void shouldFailWithBadServerSideSerialization() throws Exception {
         final Cluster cluster = TestClientFactory.open();
         final Client client = cluster.connect();
@@ -1219,25 +1215,27 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
     @Test
     public void shouldExecuteScriptsInMultipleSession() throws Exception {
         final Cluster cluster = TestClientFactory.open();
-        final Client client1 = cluster.connect(name.getMethodName() + "1");
-        final Client client2 = cluster.connect(name.getMethodName() + "2");
-        final Client client3 = cluster.connect(name.getMethodName() + "3");
-
-        final ResultSet results11 = client1.submit("x = 1");
-        final ResultSet results21 = client2.submit("x = 2");
-        final ResultSet results31 = client3.submit("x = 3");
-        assertEquals(1, results11.all().get().get(0).getInt());
-        assertEquals(2, results21.all().get().get(0).getInt());
-        assertEquals(3, results31.all().get().get(0).getInt());
-
-        final ResultSet results12 = client1.submit("x + 100");
-        final ResultSet results22 = client2.submit("x * 2");
-        final ResultSet results32 = client3.submit("x * 10");
-        assertEquals(101, results12.all().get().get(0).getInt());
-        assertEquals(4, results22.all().get().get(0).getInt());
-        assertEquals(30, results32.all().get().get(0).getInt());
-
-        cluster.close();
+        try {
+            final Client client1 = cluster.connect(name.getMethodName() + "1");
+            final Client client2 = cluster.connect(name.getMethodName() + "2");
+            final Client client3 = cluster.connect(name.getMethodName() + "3");
+
+            final ResultSet results11 = client1.submit("x = 1");
+            final ResultSet results21 = client2.submit("x = 2");
+            final ResultSet results31 = client3.submit("x = 3");
+            assertEquals(1, results11.all().get().get(0).getInt());
+            assertEquals(2, results21.all().get().get(0).getInt());
+            assertEquals(3, results31.all().get().get(0).getInt());
+
+            final ResultSet results12 = client1.submit("x + 100");
+            final ResultSet results22 = client2.submit("x * 2");
+            final ResultSet results32 = client3.submit("x * 10");
+            assertEquals(101, results12.all().get().get(0).getInt());
+            assertEquals(4, results22.all().get().get(0).getInt());
+            assertEquals(30, results32.all().get().get(0).getInt());
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
index 807e9a7..b87967a 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
@@ -21,21 +21,25 @@ package org.apache.tinkerpop.gremlin.server;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.tinkerpop.gremlin.driver.Client;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.Channelizer;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
+import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
-import org.ietf.jgss.GSSException;
 import org.apache.tinkerpop.gremlin.structure.Property;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.ietf.jgss.GSSException;
 import org.junit.Test;
 
+import java.net.ConnectException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
+import java.util.concurrent.*;
 
+import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal;
 import static org.hamcrest.CoreMatchers.startsWith;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.AnyOf.anyOf;
@@ -79,6 +83,52 @@ public class GremlinServerAuthIntegrateTest extends AbstractGremlinServerIntegra
     }
 
     @Test
+    public void shouldAuthenticateTraversalWithThreads() throws Exception {
+        final Cluster cluster = TestClientFactory.build().nioPoolSize(1).credentials("stephen", "password").create();
+        final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster, "gmodern"));
+
+        final ExecutorService executor = Executors.newFixedThreadPool(4);
+        final Callable<Long> countTraversalJob = () -> g.V().both().both().count().next();
+        final List<Future<Long>> results = executor.invokeAll(Collections.nCopies(100, countTraversalJob));
+
+        assertEquals(100, results.size());
+        for (int ix = 0; ix < results.size(); ix++) {
+            try {
+                assertEquals(30L, results.get(ix).get(1000, TimeUnit.MILLISECONDS).longValue());
+            } catch (Exception ex) {
+                // failure but shouldn't have
+                cluster.close();
+                fail("Exception halted assertions - " + ex.getMessage());
+            }
+        }
+
+        cluster.close();
+    }
+
+    @Test
+    public void shouldAuthenticateScriptWithThreads() throws Exception {
+        final Cluster cluster = TestClientFactory.build().nioPoolSize(1).credentials("stephen", "password").create();
+        final Client client = cluster.connect();
+
+        final ExecutorService executor = Executors.newFixedThreadPool(4);
+        final Callable<Long> countTraversalJob = () -> client.submit("gmodern.V().both().both().count()").all().get().get(0).getLong();
+        final List<Future<Long>> results = executor.invokeAll(Collections.nCopies(100, countTraversalJob));
+
+        assertEquals(100, results.size());
+        for (int ix = 0; ix < results.size(); ix++) {
+            try {
+                assertEquals(30L, results.get(ix).get(1000, TimeUnit.MILLISECONDS).longValue());
+            } catch (Exception ex) {
+                // failure but shouldn't have
+                cluster.close();
+                fail("Exception halted assertions - " + ex.getMessage());
+            }
+        }
+
+        cluster.close();
+    }
+
+    @Test
     public void shouldFailIfSslEnabledOnServerButNotClient() throws Exception {
         final Cluster cluster = TestClientFactory.open();
         final Client client = cluster.connect();
@@ -88,8 +138,8 @@ public class GremlinServerAuthIntegrateTest extends AbstractGremlinServerIntegra
             fail("This should not succeed as the client did not enable SSL");
         } catch(Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
-            assertEquals(TimeoutException.class, root.getClass());
-            assertThat(root.getMessage(), startsWith("Timed out while waiting for an available host"));
+            assertEquals(ConnectException.class, root.getClass());
+            assertThat(root.getMessage(), startsWith("Unable to find a valid connection"));
         } finally {
             cluster.close();
         }
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java
index 029a408..de838e0 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java
@@ -18,27 +18,39 @@
  */
 package org.apache.tinkerpop.gremlin.server;
 
+import org.apache.commons.configuration2.BaseConfiguration;
+import org.apache.commons.configuration2.Configuration;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.tinkerpop.gremlin.driver.Client;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.Channelizer;
 import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
 import org.apache.tinkerpop.gremlin.driver.ser.GraphBinaryMessageSerializerV1;
+import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
 import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0;
 import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.server.auth.Krb5Authenticator;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.RemoteGraph;
 import org.ietf.jgss.GSSException;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.*;
+import java.util.function.Supplier;
 import javax.security.auth.login.LoginException;
 
+import static org.apache.tinkerpop.gremlin.process.remote.RemoteConnection.GREMLIN_REMOTE_CONNECTION_CLASS;
+import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -54,6 +66,16 @@ public class GremlinServerAuthKrb5IntegrateTest extends AbstractGremlinServerInt
 
     private KdcFixture kdcServer;
 
+    private final Supplier<Graph> graphGetter = () -> server.getServerGremlinExecutor().getGraphManager().getGraph("graph");
+    private final Configuration conf = new BaseConfiguration() {{
+        setProperty(Graph.GRAPH, RemoteGraph.class.getName());
+        setProperty(GREMLIN_REMOTE_CONNECTION_CLASS, DriverRemoteConnection.class.getName());
+        setProperty(DriverRemoteConnection.GREMLIN_REMOTE_DRIVER_SOURCENAME, "g");
+        setProperty("hidden.for.testing.only", graphGetter);
+        setProperty("clusterConfiguration.port", TestClientFactory.PORT);
+        setProperty("clusterConfiguration.hosts", "localhost");
+    }};
+
     @Before
     @Override
     public void setUp() throws Exception {
@@ -87,6 +109,7 @@ public class GremlinServerAuthKrb5IntegrateTest extends AbstractGremlinServerInt
 
         final String nameOfTest = name.getMethodName();
         switch (nameOfTest) {
+            case "shouldAuthenticateWithThreads":
             case "shouldAuthenticateWithDefaults":
             case "shouldFailWithoutClientJaasEntry":
             case "shouldFailWithoutClientTicketCache":
@@ -118,6 +141,58 @@ public class GremlinServerAuthKrb5IntegrateTest extends AbstractGremlinServerInt
     }
 
     @Test
+    public void shouldAuthenticateTraversalWithThreads() throws Exception {
+        final Cluster cluster = TestClientFactory.build()
+                .nioPoolSize(1)
+                .jaasEntry(TESTCONSOLE)
+                .protocol(kdcServer.serverPrincipalName).addContactPoint(kdcServer.hostname).create();
+        final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster, "gmodern"));
+
+        final ExecutorService executor = Executors.newFixedThreadPool(4);
+        final Callable<Long> countTraversalJob = () -> g.V().both().both().count().next();
+        final List<Future<Long>> results = executor.invokeAll(Collections.nCopies(100, countTraversalJob));
+
+        assertEquals(100, results.size());
+        for (int ix = 0; ix < results.size(); ix++) {
+            try {
+                assertEquals(30L, results.get(ix).get(1000, TimeUnit.MILLISECONDS).longValue());
+            } catch (Exception ex) {
+                // failure but shouldn't have
+                cluster.close();
+                fail("Exception halted assertions - " + ex.getMessage());
+            }
+        }
+
+        cluster.close();
+    }
+
+    @Test
+    public void shouldAuthenticateScriptWithThreads() throws Exception {
+        final Cluster cluster = TestClientFactory.build()
+                .nioPoolSize(1)
+                .jaasEntry(TESTCONSOLE)
+                .protocol(kdcServer.serverPrincipalName).addContactPoint(kdcServer.hostname).create();
+        final Client client = cluster.connect();
+
+        final ExecutorService executor = Executors.newFixedThreadPool(4);
+        final Callable<Long> countTraversalJob = () -> client.submit("gmodern.V().both().both().count()").all().get().get(0).getLong();
+        final List<Future<Long>> results = executor.invokeAll(Collections.nCopies(100, countTraversalJob));
+
+        assertEquals(100, results.size());
+        for (int ix = 0; ix < results.size(); ix++) {
+            try {
+                assertEquals(30L, results.get(ix).get(1000, TimeUnit.MILLISECONDS).longValue());
+            } catch (Exception ex) {
+                // failure but shouldn't have
+                cluster.close();
+                fail("Exception halted assertions - " + ex.getMessage());
+            }
+        }
+
+        cluster.close();
+    }
+
+    @Test
     public void shouldAuthenticateWithDefaults() throws Exception {
         final Cluster cluster = TestClientFactory.build().jaasEntry(TESTCONSOLE)
                 .protocol(kdcServer.serverPrincipalName).addContactPoint(kdcServer.hostname).create();
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
index 9a97064..81ea473 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
@@ -46,7 +46,6 @@ import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine;
 import org.apache.tinkerpop.gremlin.groovy.jsr223.GroovyCompilerGremlinPlugin;
 import org.apache.tinkerpop.gremlin.groovy.jsr223.customizer.SimpleSandboxExtension;
 import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin;
-import org.apache.tinkerpop.gremlin.structure.RemoteGraph;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
@@ -54,6 +53,7 @@ import org.apache.tinkerpop.gremlin.server.handler.OpSelectorHandler;
 import org.apache.tinkerpop.gremlin.server.op.AbstractEvalOpProcessor;
 import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor;
 import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.RemoteGraph;
 import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
@@ -64,6 +64,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.lang.reflect.Field;
+import java.net.ConnectException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -579,7 +581,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
             fail("Should throw exception because ssl is enabled on the server but not on client");
         } catch(Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(ConnectException.class));
         } finally {
             cluster.close();
         }
@@ -622,7 +624,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
             fail("Should throw exception because ssl client auth is enabled on the server but client does not have a cert");
         } catch (Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(ConnectException.class));
         } finally {
             cluster.close();
         }
@@ -639,7 +641,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
             fail("Should throw exception because ssl client auth is enabled on the server but does not trust client's cert");
         } catch (Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(ConnectException.class));
         } finally {
             cluster.close();
         }
@@ -656,7 +658,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
             fail("Should throw exception because ssl client requires TLSv1.2 whereas server supports only TLSv1.1");
         } catch (Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(ConnectException.class));
         } finally {
             cluster.close();
         }
@@ -673,7 +675,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
             fail("Should throw exception because ssl client requires TLSv1.2 whereas server supports only TLSv1.1");
         } catch (Exception x) {
             final Throwable root = ExceptionUtils.getRootCause(x);
-            assertThat(root, instanceOf(TimeoutException.class));
+            assertThat(root, instanceOf(ConnectException.class));
         } finally {
             cluster.close();
         }
@@ -1041,14 +1043,14 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
             resultSet.all().get(10000, TimeUnit.MILLISECONDS);
             fail("Should throw an exception.");
         } catch (TimeoutException te) {
-            // the request should not have timed-out - the connection should have been reset, but it seems that
+            // the request should not have timed-out - the connectionPool should have been reset, but it seems that
             // timeout seems to occur as well on some systems (it's not clear why).  however, the nature of this
             // test is to ensure that the script isn't processed if it exceeds a certain size, so in this sense
             // it seems ok to pass in this case.
         } catch (Exception re) {
             final Throwable root = ExceptionUtils.getRootCause(re);
             // Netty closes the channel to the server on a non-recoverable error such as CorruptedFrameException
-            // and the connection is subsequently destroyed. Each of the pending requests are given an error with
+            // and the connectionPool is subsequently destroyed. Each of the pending requests are given an error with
             // the following error message.
             //
             // went with two possible error messages here as i think that there is some either non-deterministic
@@ -1067,7 +1069,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.build().create();
         final Client client = cluster.connect();
 
-        // ensure that connection to server is good
+        // ensure that connectionPool to server is good
         assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
 
         // kill the server which will make the client mark the host as unavailable
@@ -1079,8 +1081,8 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
             fail("Should throw an exception.");
         } catch (RuntimeException re) {
             // Client would have no active connections to the host, hence it would encounter a timeout
-            // trying to find an alive connection to the host.
-            assertThat(re.getCause().getCause() instanceof TimeoutException, is(true));
+            // trying to find an alive connectionPool to the host.
+            assertThat(re.getCause().getCause() instanceof ConnectException, is(true));
 
             //
             // should recover when the server comes back
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
index b1aa835..a6974a8 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
@@ -310,7 +310,7 @@ public class GremlinServerSessionIntegrateTest  extends AbstractGremlinServerInt
             client.submit("x[1]+2").all().get();
             fail("Session should be dead");
         } catch (Exception ex) {
-            final Throwable cause = ExceptionUtils.getCause(ex);
+            final Throwable cause = ex.getCause();
             assertThat(cause, instanceOf(ResponseException.class));
             assertEquals(ResponseStatusCode.SERVER_ERROR_EVALUATION, ((ResponseException) cause).getResponseStatusCode());
 
@@ -320,39 +320,47 @@ public class GremlinServerSessionIntegrateTest  extends AbstractGremlinServerInt
             cluster.close();
         }
 
-        // there will be on for the timeout and a second for closing the cluster
+        // there will be one for the timeout and a second for closing the cluster
         assertEquals(2, recordingAppender.getMessages().stream()
-                .filter(msg -> msg.equals("INFO - Session shouldHaveTheSessionTimeout closed\n")).count());
+                .filter(msg -> msg.contains("Session shouldHaveTheSessionTimeout closed")).count());
     }
 
     @Test
     public void shouldEnsureSessionBindingsAreThreadSafe() throws Exception {
-        final Cluster cluster = TestClientFactory.build().minInProcessPerConnection(16).maxInProcessPerConnection(64).create();
-        final Client client = cluster.connect(name.getMethodName());
-
-        client.submitAsync("a=100;b=1000;c=10000;null");
-        final int requests = 10000;
-        final List<CompletableFuture<ResultSet>> futures = new ArrayList<>(requests);
-        IntStream.range(0, requests).forEach(i -> {
-            try {
-                futures.add(client.submitAsync("a+b+c"));
-            } catch (Exception ex) {
-                throw new RuntimeException(ex);
+        final Cluster cluster = TestClientFactory.build()
+                                                 .maxConnectionPoolSize(1000)
+                                                 .maxInProcessPerConnection(0) // disable these deprecated parameters
+                                                 .maxSimultaneousUsagePerConnection(0) // disable these deprecated parameters
+                                                 .create();
+        try {
+            final Client client = cluster.connect(name.getMethodName());
+
+            // It is important for this query to be synchronously executed to set the values
+            // of a,b,c on the server before other queries start to execute.
+            client.submit("a=100;b=1000;c=10000;null").all().join();
+            final int requests = 1000;
+            final List<CompletableFuture<ResultSet>> futures = new ArrayList<>(requests);
+            IntStream.range(0, requests).forEach(i -> {
+                try {
+                    futures.add(client.submitAsync("a+b+c"));
+                } catch (Exception ex) {
+                    throw new RuntimeException(ex);
+                }
+            });
+
+            assertEquals(requests, futures.size());
+
+            int counter = 0;
+            for (CompletableFuture<ResultSet> f : futures) {
+                final Result r = f.get().all().get(30000, TimeUnit.MILLISECONDS).get(0);
+                assertEquals(11100, r.getInt());
+                counter++;
             }
-        });
-
-        assertEquals(requests, futures.size());
 
-        int counter = 0;
-        for(CompletableFuture<ResultSet> f : futures) {
-            final Result r = f.get().all().get(30000, TimeUnit.MILLISECONDS).get(0);
-            assertEquals(11100, r.getInt());
-            counter++;
+            assertEquals(requests, counter);
+        } finally {
+            cluster.close();
         }
-
-        assertEquals(requests, counter);
-
-        cluster.close();
     }
 
     @Test