You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by di...@apache.org on 2020/08/22 01:03:19 UTC

[tinkerpop] branch 3.4-dev updated: TINKERPOP-2369 Replace Connection on server initiated Channel close (#1309)

This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch 3.4-dev
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/3.4-dev by this push:
     new b6a64aa  TINKERPOP-2369 Replace Connection on server initiated Channel close (#1309)
b6a64aa is described below

commit b6a64aa07fba6c70c8727860112dd4e918017df3
Author: Divij Vaidya <di...@gmail.com>
AuthorDate: Fri Aug 21 18:02:16 2020 -0700

    TINKERPOP-2369 Replace Connection on server initiated Channel close (#1309)
    
    * TINKERPOP-2369 Remove Connection from ConnectionPool on server initiated Channel close
    
    Changes
    1. With this change, we introduce a callback on Channel close which would remove the connection from the ConnectionPool and destroy it. This would ensure that any new request would not be assigned to the already closed Channel.
    2. With this change, we introduce a new lock on Connection which will ensure that replacement of a Connection is done only once.
---
 CHANGELOG.asciidoc                                 |    3 +-
 .../tinkerpop/gremlin/driver/Channelizer.java      |   10 +-
 .../tinkerpop/gremlin/driver/Connection.java       |  102 +-
 .../tinkerpop/gremlin/driver/ConnectionPool.java   |   40 +-
 .../gremlin/driver/SimpleWebSocketServer.java      |   52 +
 .../gremlin/driver/TestWSGremlinInitializer.java   |  140 +++
 .../driver/TestWebSocketServerInitializer.java     |   47 +
 .../WebSocketClientBehaviorIntegrateTest.java      |  214 ++++
 .../gremlin/util/Log4jRecordingAppender.java       |   89 ++
 .../gremlin/util/Log4jRecordingAppenderTest.java   |   84 ++
 .../gremlin/server/auth/Krb5Authenticator.java     |    5 +-
 .../driver/ClientConnectionIntegrateTest.java      |    1 -
 .../AbstractGremlinServerIntegrationTest.java      |    4 +-
 .../gremlin/server/GremlinDriverIntegrateTest.java | 1091 +++++++++++---------
 .../server/GremlinServerAuditLogIntegrateTest.java |    5 -
 .../server/GremlinServerSessionIntegrateTest.java  |   21 +-
 ...tractGremlinServerChannelizerIntegrateTest.java |   38 +-
 17 files changed, 1383 insertions(+), 563 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 5bdaaaf..96ef614 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -22,8 +22,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 
 [[release-3-4-9]]
 === TinkerPop 3.4.9 (Release Date: NOT OFFICIALLY RELEASED YET)
-
-
+* Remove `Connection` from `Connection Pool` when server closes a connection with no pending requests.
 
 [[release-3-4-8]]
 === TinkerPop 3.4.8 (Release Date: August 3, 2020)
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index c4bd3e1..a2841a8 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -58,7 +58,7 @@ public interface Channelizer extends ChannelHandler {
     public void init(final Connection connection);
 
     /**
-     * Called on {@link Connection#close()} to perform an {@code Channelizer} specific functions.  Note that the
+     * Called on {@link Connection#closeAsync()} to perform an {@code Channelizer} specific functions.  Note that the
      * {@link Connection} already calls {@code Channel.close()} so there is no need to call that method here.
      * An implementation will typically use this method to send a {@code Channelizer} specific message to the
      * server to notify of shutdown coming from the client side (e.g. a "close" websocket frame).
@@ -148,6 +148,7 @@ public interface Channelizer extends ChannelHandler {
      * WebSocket {@link Channelizer} implementation.
      */
     public final class WebSocketChannelizer extends AbstractChannelizer {
+        private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WebSocketChannelizer.class);
         private WebSocketClientHandler handler;
 
         private WebSocketGremlinRequestEncoder webSocketGremlinRequestEncoder;
@@ -180,7 +181,12 @@ public interface Channelizer extends ChannelHandler {
          */
         @Override
         public void close(final Channel channel) {
-            if (channel.isOpen()) channel.writeAndFlush(new CloseWebSocketFrame());
+            if (channel.isOpen()) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Sending CloseWS frame to server.");
+                }
+                channel.writeAndFlush(new CloseWebSocketFrame());
+            }
         }
 
         @Override
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 50738d9..805de0c 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -18,17 +18,18 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
-import io.netty.handler.codec.CodecException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.process.traversal.util.ConnectiveP;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.socket.nio.NioSocketChannel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.net.URI;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -76,6 +77,10 @@ final class Connection {
      * busy a particular {@code Connection} is.
      */
     public final AtomicInteger borrowed = new AtomicInteger(0);
+    /**
+     * This boolean guards the replace of the connection and ensures that it only occurs once.
+     */
+    public final AtomicBoolean isBeingReplaced = new AtomicBoolean(false);
     private final AtomicReference<Class<Channelizer>> channelizerClass = new AtomicReference<>(null);
 
     private final int maxInProcess;
@@ -98,7 +103,8 @@ final class Connection {
 
         connectionLabel = String.format("Connection{host=%s}", pool.host);
 
-        if (cluster.isClosing()) throw new IllegalStateException("Cannot open a connection with the cluster after close() is called");
+        if (cluster.isClosing())
+            throw new IllegalStateException("Cannot open a connection with the cluster after close() is called");
 
         final Bootstrap b = this.cluster.getFactory().createBootstrap();
         try {
@@ -113,6 +119,26 @@ final class Connection {
             channel = b.connect(uri.getHost(), uri.getPort()).sync().channel();
             channelizer.connected();
 
+            /* Configure behaviour on close of this channel.
+             *
+             * This callback would trigger the workflow to destroy this connection, so that a new request doesn't pick
+             * this closed connection.
+             */
+            final Connection thisConnection = this;
+            channel.closeFuture().addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if(logger.isDebugEnabled()) {
+                        logger.debug("OnChannelClose callback called for channel {}", channel.id().asShortText());
+                    }
+                    // Replace the channel if it was not intentionally closed using CloseAsync method.
+                    if (thisConnection.closeFuture.get() == null) {
+                        // delegate the task to worker thread and free up the event loop
+                        thisConnection.cluster.executor().submit(() -> thisConnection.pool.definitelyDestroyConnection(thisConnection));
+                    }
+                }
+            });
+
             logger.info("Created new connection for {}", uri);
 
             scheduleKeepAlive();
@@ -134,12 +160,12 @@ final class Connection {
 
     /**
      * Consider a connection as dead if the underlying channel is not connected.
-     *
+     * <p>
      * Note: A dead connection does not necessarily imply that the server is unavailable. Additional checks
      * should be performed to mark the server host as unavailable.
      */
     public boolean isDead() {
-        return (channel !=null && !channel.isActive());
+        return (channel != null && !channel.isActive());
     }
 
     boolean isClosing() {
@@ -191,14 +217,6 @@ final class Connection {
         return future;
     }
 
-    public void close() {
-        try {
-            closeAsync().get();
-        } catch (Exception ex) {
-            throw new RuntimeException(ex);
-        }
-    }
-
     public ChannelPromise write(final RequestMessage requestMessage, final CompletableFuture<ResultSet> future) {
         // once there is a completed write, then create a traverser for the result set and complete
         // the promise so that the client knows that that it can start checking for results.
@@ -210,7 +228,7 @@ final class Connection {
                         if (logger.isDebugEnabled())
                             logger.debug(String.format("Write on connection %s failed", thisConnection.getConnectionInfo()), f.cause());
 
-                        handleConnectionCleanupOnError(thisConnection, f.cause());
+                        handleConnectionCleanupOnError(thisConnection);
 
                         cluster.executor().submit(() -> future.completeExceptionally(f.cause()));
                     } else {
@@ -220,7 +238,13 @@ final class Connection {
                         // the callback for when the read was successful, meaning that ResultQueue.markComplete()
                         // was called
                         readCompleted.thenAcceptAsync(v -> {
+                            // connection is fine, just return it to the pool
                             thisConnection.returnToPool();
+
+                            // While this request was in process, close might have been signaled in closeAsync().
+                            // However, close would be blocked until all pending requests are completed. Attempt
+                            // the shutdown if the returned result cleared up the last pending message and unblocked
+                            // the close.
                             tryShutdown();
                         }, cluster.executor());
 
@@ -229,11 +253,12 @@ final class Connection {
                         // so this isn't going to be like a potentially dead host situation which is handled above on a failed
                         // write operation.
                         readCompleted.exceptionally(t -> {
+                            handleConnectionCleanupOnError(thisConnection);
 
-                            handleConnectionCleanupOnError(thisConnection, t);
-
-                            // close was signaled in closeAsync() but there were pending messages at that time. attempt
-                            // the shutdown if the returned result cleared up the last pending message
+                            // While this request was in process, close might have been signaled in closeAsync().
+                            // However, close would be blocked until all pending requests are completed. Attempt
+                            // the shutdown if the returned result cleared up the last pending message and unblocked
+                            // the close.
                             tryShutdown();
 
                             return null;
@@ -275,7 +300,7 @@ final class Connection {
         }
     }
 
-    public void returnToPool() {
+    private void returnToPool() {
         try {
             if (pool != null) pool.returnConnection(this);
         } catch (ConnectionException ce) {
@@ -284,7 +309,7 @@ final class Connection {
         }
     }
 
-    private void handleConnectionCleanupOnError(final Connection thisConnection, final Throwable t) {
+    private void handleConnectionCleanupOnError(final Connection thisConnection) {
         if (thisConnection.isDead()) {
             if (pool != null) pool.replaceConnection(thisConnection);
         } else {
@@ -293,7 +318,7 @@ final class Connection {
     }
 
     private boolean isOkToClose() {
-        return pending.isEmpty() || (channel !=null && !channel.isOpen()) || !pool.host.isAvailable();
+        return pending.isEmpty() || (channel != null && !channel.isOpen()) || !pool.host.isAvailable();
     }
 
     /**
@@ -311,7 +336,6 @@ final class Connection {
         // messages at the server and leads to ugly log messages over there.
         if (shutdownInitiated.compareAndSet(false, true)) {
             final String connectionInfo = this.getConnectionInfo();
-
             // this block of code that "closes" the session is deprecated as of 3.3.11 - this message is going to be
             // removed at 3.5.0. we will instead bind session closing to the close of the channel itself and not have
             // this secondary operation here which really only acts as a means for clearing resources in a functioning
@@ -320,7 +344,7 @@ final class Connection {
             // is annoyed that a long run operation is happening and they want an immediate cancellation. that's the
             // most likely use case. we also get the nice benefit that this if/then code just goes away as the
             // Connection really shouldn't care about the specific Client implementation.
-            if (client instanceof Client.SessionedClient) {
+            if (client instanceof Client.SessionedClient && !isDead()) {
                 final boolean forceClose = client.getSettings().getSession().get().isForceClosed();
                 final RequestMessage closeMessage = client.buildMessage(
                         RequestMessage.build(Tokens.OPS_CLOSE).addArg(Tokens.ARGS_FORCE, forceClose)).create();
@@ -360,18 +384,34 @@ final class Connection {
                 }
             });
 
-            channel.close(promise);
+            // close the netty channel, if not already closed
+            if (!channel.closeFuture().isDone()) {
+                channel.close(promise);
+            } else {
+                if (!promise.trySuccess()) {
+                    logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
+                }
+            }
         }
     }
 
     public String getConnectionInfo() {
-        return String.format("Connection{host=%s, isDead=%s, borrowed=%s, pending=%s}",
-                pool.host, isDead(), borrowed, pending.size());
+        return String.format("Connection{channel=%s, host=%s, isDead=%s, borrowed=%s, pending=%s}",
+                channel, pool.host, isDead(), borrowed, pending.size());
+    }
+
+    /**
+     * Returns the short ID for the underlying channel for this connection.
+     * <p>
+     * Currently only used for testing.
+     */
+    String getChannelId() {
+        return (channel != null) ? channel.id().asShortText() : "";
     }
 
     @Override
     public String toString() {
-        return connectionLabel;
+        return String.format(connectionLabel + ", {channel=%s}", getChannelId());
     }
 
     /**
@@ -396,7 +436,7 @@ final class Connection {
                 shutdown(future);
                 boolean interrupted = false;
                 try {
-                    while(null == self) {
+                    while (null == self) {
                         try {
                             Thread.sleep(1);
                         } catch (InterruptedException e) {
@@ -405,7 +445,7 @@ final class Connection {
                     }
                     self.cancel(false);
                 } finally {
-                    if(interrupted) {
+                    if (interrupted) {
                         Thread.currentThread().interrupt();
                     }
                 }
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index 332731e..f7b35e9 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -24,6 +24,9 @@ import org.apache.tinkerpop.gremlin.util.TimeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -33,11 +36,13 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 /**
  * @author Stephen Mallette (http://stephen.genoprime.com)
@@ -92,12 +97,14 @@ final class ConnectionPool {
         this.connections = new CopyOnWriteArrayList<>();
 
         try {
-            for (int i = 0; i < minPoolSize; i++)
+            for (int i = 0; i < minPoolSize; i++) {
                 this.connections.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection));
+            }
+
         } catch (ConnectionException ce) {
             // ok if we don't get it initialized here - when a request is attempted in a connection from the
             // pool it will try to create new connections as needed.
-            logger.debug("Could not initialize connections in pool for {} - pool size at {}", host, this.connections.size());
+            logger.info("Could not initialize connections in pool for {} - pool size at {}", host, this.connections.size(), ce);
             considerHostUnavailable();
         }
 
@@ -214,7 +221,7 @@ final class ConnectionPool {
     }
 
     public boolean isClosed() {
-        return closeFuture.get() != null;
+        return this.closeFuture.get() != null;
     }
 
     /**
@@ -228,6 +235,7 @@ final class ConnectionPool {
         announceAllAvailableConnection();
         final CompletableFuture<Void> future = killAvailableConnections();
         closeFuture.set(future);
+
         return future;
     }
 
@@ -246,11 +254,20 @@ final class ConnectionPool {
             futures.add(future);
         }
 
-        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
+        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
     }
 
+    /**
+     * This method is not idempotent and should only be called once per connection.
+     */
     void replaceConnection(final Connection connection) {
-        logger.debug("Replace {}", connection);
+        logger.info("Replace {}", connection);
+
+        // Do not replace connection if the conn pool is closing/closed.
+        // Do not replace connection if it is already being replaced.
+        if (connection.isBeingReplaced.getAndSet(true) || isClosed()) {
+            return;
+        }
 
         considerNewConnection();
         definitelyDestroyConnection(connection);
@@ -323,7 +340,7 @@ final class ConnectionPool {
         return true;
     }
 
-    private void definitelyDestroyConnection(final Connection connection) {
+    public void definitelyDestroyConnection(final Connection connection) {
         // only add to the bin for future removal if its not already there.
         if (!bin.contains(connection) && !connection.isClosing()) {
             bin.add(connection);
@@ -335,6 +352,7 @@ final class ConnectionPool {
         if (connection.isDead() || connection.borrowed.get() == 0) {
             if(bin.remove(connection)) {
                 connection.closeAsync();
+                // TODO: Log the following message on completion of the future returned by closeAsync.
                 logger.debug("{} destroyed", connection.getConnectionInfo());
             }
         }
@@ -417,7 +435,7 @@ final class ConnectionPool {
             this.cluster.loadBalancingStrategy().onAvailable(h);
             return true;
         } catch (Exception ex) {
-            logger.debug("Failed reconnect attempt on {}", h);
+            logger.debug("Failed reconnect attempt on {}", h, ex);
             if (connection != null) definitelyDestroyConnection(connection);
             return false;
         }
@@ -475,6 +493,14 @@ final class ConnectionPool {
         }
     }
 
+    /**
+     * Returns the set of Channel IDs maintained by the connection pool.
+     * Currently, only used for testing.
+     */
+    Set<String> getConnectionIDs() {
+        return connections.stream().map(Connection::getChannelId).collect(Collectors.toSet());
+    }
+
     public String getPoolInfo() {
         final StringBuilder sb = new StringBuilder("ConnectionPool (");
         sb.append(host);
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/SimpleWebSocketServer.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/SimpleWebSocketServer.java
new file mode 100644
index 0000000..5dad5ac
--- /dev/null
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/SimpleWebSocketServer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+
+/**
+ * Simple Netty Server
+ */
+public class SimpleWebSocketServer {
+    public static final int PORT = 45940;
+    private EventLoopGroup bossGroup;
+    private EventLoopGroup workerGroup;
+
+    public Channel start(TestWebSocketServerInitializer channelInitializer) throws InterruptedException {
+        bossGroup = new NioEventLoopGroup(1);
+        workerGroup = new NioEventLoopGroup();
+        ServerBootstrap b = new ServerBootstrap();
+        b.group(bossGroup, workerGroup)
+                .channel(NioServerSocketChannel.class)
+                .handler(new LoggingHandler(LogLevel.INFO))
+                .childHandler(channelInitializer);
+        return b.bind(PORT).sync().channel();
+    }
+
+    public void stop() {
+        bossGroup.shutdownGracefully();
+        workerGroup.shutdownGracefully();
+    }
+}
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/TestWSGremlinInitializer.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/TestWSGremlinInitializer.java
new file mode 100644
index 0000000..b21b353
--- /dev/null
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/TestWSGremlinInitializer.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerV2d0;
+import org.apache.tinkerpop.gremlin.driver.ser.SerializationException;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+
+import java.util.List;
+import java.util.UUID;
+
+
+/**
+ * Initializer which partially mimics the Gremlin Server. This initializer injects a handler in the
+ * server pipeline that can be modified to send the desired response for a test case.
+ */
+public class TestWSGremlinInitializer extends TestWebSocketServerInitializer {
+    private static final Logger logger = LoggerFactory.getLogger(TestWSGremlinInitializer.class);
+    /**
+     * If a request with this ID comes to the server, the server responds back with a single vertex picked from Modern
+     * graph.
+     */
+    public static final UUID SINGLE_VERTEX_REQUEST_ID =
+            UUID.fromString("6457272A-4018-4538-B9AE-08DD5DDC0AA1");
+
+    /**
+     * If a request with this ID comes to the server, the server responds back with a single vertex picked from Modern
+     * graph. After some delay, server sends a Close WebSocket frame on the same connection.
+     */
+    public static final UUID SINGLE_VERTEX_DELAYED_CLOSE_CONNECTION_REQUEST_ID =
+            UUID.fromString("3cb39c94-9454-4398-8430-03485d08bdae");
+
+    public static final UUID FAILED_AFTER_DELAY_REQUEST_ID =
+            UUID.fromString("edf79c8b-1d32-4102-a5d2-a5feeca40864");
+    public static final UUID CLOSE_CONNECTION_REQUEST_ID =
+            UUID.fromString("0150143b-00f9-48a7-a268-28142d902e18");
+    public static final UUID CLOSE_CONNECTION_REQUEST_ID_2 =
+            UUID.fromString("3c4cf18a-c7f2-4dad-b9bf-5c701eb33000");
+    public static final UUID RESPONSE_CONTAINS_SERVER_ERROR_REQUEST_ID =
+            UUID.fromString("0d333b1d-6e91-4807-b915-50b9ad721d20");
+
+    /**
+     * Gremlin serializer used for serializing/deserializing the request/response. This should be same as client.
+     */
+    private static final GraphSONMessageSerializerV2d0 SERIALIZER = new GraphSONMessageSerializerV2d0();
+
+    @Override
+    public void postInit(ChannelPipeline pipeline) {
+        pipeline.addLast(new ClientTestConfigurableHandler());
+    }
+
+    /**
+     * Handler introduced in the server pipeline to configure expected response for test cases.
+     */
+    private static class ClientTestConfigurableHandler extends MessageToMessageDecoder<BinaryWebSocketFrame> {
+        @Override
+        protected void decode(final ChannelHandlerContext ctx, final BinaryWebSocketFrame frame, final List<Object> objects)
+                throws Exception {
+            final ByteBuf messageBytes = frame.content();
+            final byte len = messageBytes.readByte();
+            if (len <= 0) {
+                objects.add(RequestMessage.INVALID);
+                return;
+            }
+
+            final ByteBuf contentTypeBytes = ctx.alloc().buffer(len);
+            try {
+                messageBytes.readBytes(contentTypeBytes);
+            } finally {
+                contentTypeBytes.release();
+            }
+            final RequestMessage msg = SERIALIZER.deserializeRequest(messageBytes.discardReadBytes());
+
+            if (msg.getRequestId().equals(SINGLE_VERTEX_DELAYED_CLOSE_CONNECTION_REQUEST_ID)) {
+                logger.info("sending vertex result frame");
+                ctx.channel().writeAndFlush(new TextWebSocketFrame(returnSingleVertexResponse(
+                        SINGLE_VERTEX_DELAYED_CLOSE_CONNECTION_REQUEST_ID)));
+                logger.info("waiting for 2 sec");
+                Thread.sleep(2000);
+                logger.info("sending close frame");
+                ctx.channel().writeAndFlush(new CloseWebSocketFrame());
+            } else if (msg.getRequestId().equals(SINGLE_VERTEX_REQUEST_ID)) {
+                logger.info("sending vertex result frame");
+                ctx.channel().writeAndFlush(new TextWebSocketFrame(returnSingleVertexResponse(SINGLE_VERTEX_REQUEST_ID)));
+            } else if (msg.getRequestId().equals(FAILED_AFTER_DELAY_REQUEST_ID)) {
+                logger.info("waiting for 2 sec");
+                Thread.sleep(1000);
+                ResponseMessage responseMessage = ResponseMessage.build(msg)
+                        .code(ResponseStatusCode.SERVER_ERROR)
+                        .statusAttributeException(new RuntimeException()).create();
+                ctx.channel().writeAndFlush(new TextWebSocketFrame(SERIALIZER.serializeResponseAsString(responseMessage)));
+            } else if (msg.getRequestId().equals(CLOSE_CONNECTION_REQUEST_ID)) {
+                Thread.sleep(1000);
+                ctx.channel().writeAndFlush(new CloseWebSocketFrame());
+            } else if (msg.getRequestId().equals(RESPONSE_CONTAINS_SERVER_ERROR_REQUEST_ID)) {
+                Thread.sleep(1000);
+                ctx.channel().writeAndFlush(new CloseWebSocketFrame());
+            }
+        }
+
+        private String returnSingleVertexResponse(UUID requestID) throws SerializationException {
+            final TinkerGraph graph = TinkerFactory.createClassic();
+            final GraphTraversalSource g = graph.traversal();
+            final Vertex t = g.V().limit(1).next();
+
+            return SERIALIZER.serializeResponseAsString(ResponseMessage.build(requestID).result(t).create());
+        }
+    }
+}
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/TestWebSocketServerInitializer.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/TestWebSocketServerInitializer.java
new file mode 100644
index 0000000..205b7c1
--- /dev/null
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/TestWebSocketServerInitializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
+
+/**
+ * A vanilla WebSocket server Initializer implementation using Netty. This initializer would configure the server for
+ * WebSocket handshake and decoding incoming WebSocket frames.
+ */
+public abstract class TestWebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
+    private static final String WEBSOCKET_PATH = "/gremlin";
+
+    @Override
+    public void initChannel(SocketChannel ch) {
+        ChannelPipeline pipeline = ch.pipeline();
+        pipeline.addLast(new HttpServerCodec());
+        pipeline.addLast(new HttpObjectAggregator(65536));
+        pipeline.addLast(new WebSocketServerCompressionHandler());
+        pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true));
+        this.postInit(ch.pipeline());
+    }
+
+    public abstract void postInit(ChannelPipeline ch);
+}
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/WebSocketClientBehaviorIntegrateTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/WebSocketClientBehaviorIntegrateTest.java
new file mode 100644
index 0000000..6be0b9d
--- /dev/null
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/WebSocketClientBehaviorIntegrateTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.log4j.Level;
+import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+public class WebSocketClientBehaviorIntegrateTest {
+    @Rule
+    public TestName name = new TestName();
+
+    private static final Logger logger = LoggerFactory.getLogger(WebSocketClientBehaviorIntegrateTest.class);
+    private Log4jRecordingAppender recordingAppender = null;
+    private Level previousLogLevel;
+    private SimpleWebSocketServer server;
+
+    @Before
+    public void setUp() throws InterruptedException {
+        recordingAppender = new Log4jRecordingAppender();
+        final org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
+        if (name.getMethodName().equals("shouldRemoveConnectionFromPoolWhenServerClose_WithPendingRequests") ||
+                name.getMethodName().equals("shouldNotCreateReplacementConnectionWhenClientClosesConnection")) {
+            final org.apache.log4j.Logger connectionPoolLogger = org.apache.log4j.Logger.getLogger(ConnectionPool.class);
+            final org.apache.log4j.Logger connectionLogger = org.apache.log4j.Logger.getLogger(Connection.class);
+            previousLogLevel = connectionPoolLogger.getLevel();
+            connectionPoolLogger.setLevel(Level.DEBUG);
+            connectionLogger.setLevel(Level.DEBUG);
+        }
+
+        rootLogger.addAppender(recordingAppender);
+
+        server = new SimpleWebSocketServer();
+        server.start(new TestWSGremlinInitializer());
+    }
+
+    @After
+    public void shutdown() {
+        server.stop();
+
+        // reset logger
+        final org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
+
+        if (name.getMethodName().equals("shouldRemoveConnectionFromPoolWhenServerClose_WithPendingRequests") ||
+                name.getMethodName().equals("shouldNotCreateReplacementConnectionWhenClientClosesConnection")) {
+            final org.apache.log4j.Logger connectionPoolLogger = org.apache.log4j.Logger.getLogger(ConnectionPool.class);
+            final org.apache.log4j.Logger connectionLogger = org.apache.log4j.Logger.getLogger(Connection.class);
+            connectionPoolLogger.setLevel(previousLogLevel);
+            connectionLogger.setLevel(previousLogLevel);
+        }
+
+        rootLogger.removeAppender(recordingAppender);
+    }
+
+    /**
+     * Test a scenario when server closes a connection which does not have any active requests. Such connection
+     * should be destroyed and replaced by another connection on next request.
+     */
+    @Test
+    public void shouldRemoveConnectionFromPoolWhenServerClose_WithNoPendingRequests() throws InterruptedException {
+        final Cluster cluster = Cluster.build("localhost").port(SimpleWebSocketServer.PORT)
+                .minConnectionPoolSize(1)
+                .maxConnectionPoolSize(1)
+                .serializer(Serializers.GRAPHSON_V2D0)
+                .create();
+        final Client.ClusteredClient client = cluster.connect();
+
+        // Initialize the client preemptively
+        client.init();
+
+        // assert number of connections opened
+        ConnectionPool channelPool = client.hostConnectionPools.values().stream().findFirst().get();
+        Assert.assertEquals(1, channelPool.getConnectionIDs().size());
+
+        final String originalConnectionID = channelPool.getConnectionIDs().iterator().next();
+        logger.info("On client init ConnectionIDs: " + channelPool.getConnectionIDs());
+
+        // trigger the testing server to send a WS close frame
+        Vertex v = client.submit("1", RequestOptions.build()
+                .overrideRequestId(TestWSGremlinInitializer.SINGLE_VERTEX_DELAYED_CLOSE_CONNECTION_REQUEST_ID).create())
+                .one().getVertex();
+
+        Assert.assertNotNull(v);
+
+        // assert connection is not closed yet
+        Assert.assertEquals(1, channelPool.getConnectionIDs().size());
+
+        // wait for server to send the close WS frame
+        Thread.sleep(6000);
+
+        // assert that original connection is not part of the connection pool any more
+        Assert.assertFalse("The original connection should have been closed by the server.",
+                channelPool.getConnectionIDs().contains(originalConnectionID));
+
+        // assert sanity after connection replacement
+        v = client.submit("1",
+                RequestOptions.build().overrideRequestId(TestWSGremlinInitializer.SINGLE_VERTEX_REQUEST_ID).create())
+                .one().getVertex();
+        Assert.assertNotNull(v);
+    }
+
+    /**
+     * Tests a scenario when the connection a faulty connection replaced by a new connection.
+     * Ensures that the creation of a new replacement channel only happens once.
+     */
+    @Test
+    public void shouldRemoveConnectionFromPoolWhenServerClose_WithPendingRequests() throws InterruptedException, ExecutionException {
+        final Cluster cluster = Cluster.build("localhost").port(SimpleWebSocketServer.PORT)
+                .minConnectionPoolSize(1)
+                .maxConnectionPoolSize(1)
+                .serializer(Serializers.GRAPHSON_V2D0)
+                .create();
+
+        final Client.ClusteredClient client = cluster.connect();
+
+        // Initialize the client preemptively
+        client.init();
+
+        // assert number of connections opened
+        ConnectionPool channelPool = client.hostConnectionPools.values().stream().findFirst().get();
+        Assert.assertEquals(1, channelPool.getConnectionIDs().size());
+
+        // Send two requests in flight. Both should error out.
+        CompletableFuture<ResultSet> req1 = client.submitAsync("1", RequestOptions.build()
+                .overrideRequestId(TestWSGremlinInitializer.CLOSE_CONNECTION_REQUEST_ID).create());
+        CompletableFuture<ResultSet> req2 = client.submitAsync("1", RequestOptions.build()
+                .overrideRequestId(TestWSGremlinInitializer.CLOSE_CONNECTION_REQUEST_ID_2).create());
+
+
+        // assert both are sent on same connection
+        Assert.assertEquals(1, channelPool.getConnectionIDs().size());
+
+        // trigger write for both requests
+        req1.get();
+        req2.get();
+
+        // wait for close message to arrive from server
+        Thread.sleep(2000);
+
+        // Assert that we should consider creating a connection only once, since only one connection is being closed.
+        Assert.assertEquals(1, recordingAppender.getMessages().stream().filter(str -> str.contains("Considering new connection on")).count());
+
+        // assert sanity after connection replacement
+        Vertex v = client.submit("1",
+                RequestOptions.build().overrideRequestId(TestWSGremlinInitializer.SINGLE_VERTEX_REQUEST_ID).create())
+                .one().getVertex();
+        Assert.assertNotNull(v);
+    }
+
+    /**
+     * Tests the scenario when client intentionally closes the connection. In this case, the
+     * connection should not be recycled.
+     */
+    @Test
+    public void shouldNotCreateReplacementConnectionWhenClientClosesConnection() throws ExecutionException, InterruptedException {
+        final Cluster cluster = Cluster.build("localhost").port(SimpleWebSocketServer.PORT)
+                .minConnectionPoolSize(1)
+                .maxConnectionPoolSize(1)
+                .serializer(Serializers.GRAPHSON_V2D0)
+                .create();
+        final Client.ClusteredClient client = cluster.connect();
+
+        // Initialize the client preemptively
+        client.init();
+
+        // assert number of connections opened
+        ConnectionPool channelPool = client.hostConnectionPools.values().stream().findFirst().get();
+        Assert.assertEquals(1, channelPool.getConnectionIDs().size());
+
+        // close the connection pool in an authentic manner
+        channelPool.closeAsync().get();
+
+        // wait for channel closure callback to trigger
+        Thread.sleep(2000);
+
+        Assert.assertEquals("OnClose callback should be called but only once", 1,
+                recordingAppender.getMessages().stream()
+                        .filter(str -> str.contains("OnChannelClose callback called for channel"))
+                        .count());
+
+        Assert.assertEquals("No new connection creation should be started", 0,
+                recordingAppender.getMessages().stream()
+                        .filter(str -> str.contains("Considering new connection on"))
+                        .count());
+    }
+}
\ No newline at end of file
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/util/Log4jRecordingAppender.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/util/Log4jRecordingAppender.java
new file mode 100644
index 0000000..b4ea986
--- /dev/null
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/util/Log4jRecordingAppender.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.util;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.PatternLayout;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Provides a way to gather logging events for purpose of testing log output.
+ *
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public class Log4jRecordingAppender extends AppenderSkeleton {
+    private final List<String> messages = new ArrayList<>();
+    private final List<LoggingEvent> events = new ArrayList<>();
+
+    public Log4jRecordingAppender() {
+        super();
+        setLayout(new PatternLayout("%p - %m%n")); // note the EOLN char(s) appended
+    }
+
+    @Override
+    protected void append(final LoggingEvent event) {
+        messages.add(layout.format(event));
+        events.add(event);
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public boolean requiresLayout() {
+        return true;
+    }
+
+    public List<String> getMessages() { return messages; }
+
+    public List<LoggingEvent> getEvents() { return events; }
+
+    public void clear() {
+        messages.clear();
+    }
+
+    /**
+     * @param regex not null
+     * @return true if there is a substring of a message matching the regular expression, where:
+     *         . matches also the EOLN char(s) defined in the layout.
+     *         $ matches the end of the string
+     */
+    public boolean logContainsAny(final String regex) {
+        Pattern pattern;
+
+        pattern = Pattern.compile( regex, Pattern.DOTALL );
+
+        return messages.stream().anyMatch(m -> pattern.matcher( m ).find());
+    }
+
+    public boolean logContainsAny(final String loggerName, final Level level, final String fragment) {
+        return events.stream().anyMatch(m -> m.getLoggerName().equals(loggerName) &&
+                m.getLevel().equals(level) && m.getMessage().toString().contains(fragment));
+    }
+    public boolean logMatchesAny(final String loggerName, final Level level, final String regex) {
+        return events.stream().anyMatch(m -> m.getLoggerName().equals(loggerName) &&
+                m.getLevel().equals(level) && m.getMessage().toString().matches(regex));
+    }
+}
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/util/Log4jRecordingAppenderTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/util/Log4jRecordingAppenderTest.java
new file mode 100644
index 0000000..dee87eb
--- /dev/null
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/util/Log4jRecordingAppenderTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.util;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public class Log4jRecordingAppenderTest {
+    private static final org.slf4j.Logger logger = LoggerFactory.getLogger(Log4jRecordingAppenderTest.class);
+    private Log4jRecordingAppender recordingAppender = null;
+    private static final String lineSeparator = System.getProperty("line.separator");
+
+    private Level originalConfiguredLevel = null;
+
+    @Before
+    public void setupForEachTest() {
+        recordingAppender = new Log4jRecordingAppender();
+        final Logger rootLogger = Logger.getRootLogger();
+        if (null == originalConfiguredLevel) originalConfiguredLevel = rootLogger.getLevel();
+        rootLogger.addAppender(recordingAppender);
+        rootLogger.setLevel(Level.ALL);
+
+        logger.error("ERROR");
+        logger.warn("WARN");
+        logger.info("INFO");
+    }
+
+    @After
+    public void teardownForEachTest() {
+        final Logger rootLogger = Logger.getRootLogger();
+        rootLogger.removeAppender(recordingAppender);
+        rootLogger.setLevel(originalConfiguredLevel);
+    }
+
+    @Test
+    public void shouldRecordMessages() {
+        assertEquals(3, recordingAppender.getMessages().size());
+        assertEquals("ERROR - ERROR" + lineSeparator, recordingAppender.getMessages().get(0));
+        assertEquals("WARN - WARN"  + lineSeparator, recordingAppender.getMessages().get(1));
+        assertEquals("INFO - INFO" + lineSeparator, recordingAppender.getMessages().get(2));
+    }
+
+    @Test
+    public void shouldMatchAnyMessages() {
+        assertTrue(recordingAppender.logContainsAny("ERROR.*"));
+    }
+
+    @Test
+    public void shouldMatchNoMessages() {
+        assertFalse(recordingAppender.logContainsAny("this is not here"));
+    }
+
+    @Test
+    public void shouldClearMessages() {
+        assertEquals(3, recordingAppender.getMessages().size());
+        recordingAppender.clear();
+        assertEquals(0, recordingAppender.getMessages().size());
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/auth/Krb5Authenticator.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/auth/Krb5Authenticator.java
index 0cadc99..6a339c4 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/auth/Krb5Authenticator.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/auth/Krb5Authenticator.java
@@ -112,7 +112,10 @@ public class Krb5Authenticator implements Authenticator {
                 // Sasl.SERVER_AUTH fixed to true (mutual authentication) and one can configure SSL for enhanced confidentiality,
                 // Sasl policy properties for negotiating the authenticatin mechanism are not relevant here, because
                 // GSSAPI is the only available mechanism for this authenticator
-                final Map props = new HashMap<String, Object>();
+                final Map<String, Object> props = new HashMap<>();
+                if (principalName == null) {
+                    throw new IllegalArgumentException("Principal name cannot be empty. Use principal name of format 'service/fqdn@kdcrealm'");
+                }
                 final String[] principalParts = principalName.split("/|@");
                 if (principalParts.length < 3) throw new IllegalArgumentException("Use principal name of format 'service/fqdn@kdcrealm'");
                 saslServer = Sasl.createSaslServer(mechanism, principalParts[0], principalParts[1], props, Krb5SaslAuthenticator.this);
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
index 6e9a262..87042dc 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
@@ -25,7 +25,6 @@ import org.apache.tinkerpop.gremlin.server.AbstractGremlinServerIntegrationTest;
 import org.apache.tinkerpop.gremlin.server.TestClientFactory;
 import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
index 1b29164..667c69f 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
@@ -131,7 +131,9 @@ public abstract class AbstractGremlinServerIntegrationTest {
     }
 
     public void stopServer() throws Exception {
-        server.stop().join();
+        if (server != null) {
+            server.stop().join();
+        }
         // reset the OpLoader processors so that they can get reconfigured on startup - Settings may have changed
         // between tests
         OpLoader.reset();
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 86ac809..d747913 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -204,42 +204,46 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
     @Test
     public void shouldReportErrorWhenRequestCantBeSerialized() throws Exception {
         final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V3D0).create();
-        final Client client = cluster.connect().alias("g");
-
         try {
-            final Map<String,Object> params = new HashMap<>();
-            params.put("r", Color.RED);
-            client.submit("r", params).all().get();
-            fail("Should have thrown exception over bad serialization");
-        } catch (Exception ex) {
-            final Throwable inner = ExceptionUtils.getRootCause(ex);
-            assertThat(inner, instanceOf(ResponseException.class));
-            assertEquals(ResponseStatusCode.REQUEST_ERROR_SERIALIZATION, ((ResponseException) inner).getResponseStatusCode());
-            assertTrue(ex.getMessage().contains("An error occurred during serialization of this request"));
-        }
+            final Client client = cluster.connect().alias("g");
 
-        // should not die completely just because we had a bad serialization error.  that kind of stuff happens
-        // from time to time, especially in the console if you're just exploring.
-        assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+            try {
+                final Map<String, Object> params = new HashMap<>();
+                params.put("r", Color.RED);
+                client.submit("r", params).all().get();
+                fail("Should have thrown exception over bad serialization");
+            } catch (Exception ex) {
+                final Throwable inner = ExceptionUtils.getRootCause(ex);
+                assertThat(inner, instanceOf(ResponseException.class));
+                assertEquals(ResponseStatusCode.REQUEST_ERROR_SERIALIZATION, ((ResponseException) inner).getResponseStatusCode());
+                assertTrue(ex.getMessage().contains("An error occurred during serialization of this request"));
+            }
 
-        cluster.close();
+            // should not die completely just because we had a bad serialization error.  that kind of stuff happens
+            // from time to time, especially in the console if you're just exploring.
+            assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
     public void shouldProcessEvalTimeoutOverride() throws Exception {
         final Cluster cluster = TestClientFactory.open();
         final Client client = cluster.connect();
-        final RequestOptions options = RequestOptions.build().timeout(500).create();
-
         try {
-            client.submit("Thread.sleep(5000);'done'", options).all().get();
-            fail("Should have timed out");
-        } catch (Exception ex) {
-            final ResponseException re = (ResponseException) ex.getCause();
-            assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, re.getResponseStatusCode());
-        }
+            final RequestOptions options = RequestOptions.build().timeout(500).create();
 
-        cluster.close();
+            try {
+                client.submit("Thread.sleep(5000);'done'", options).all().get();
+                fail("Should have timed out");
+            } catch (Exception ex) {
+                final ResponseException re = (ResponseException) ex.getCause();
+                assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, re.getResponseStatusCode());
+            }
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -253,9 +257,9 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         } catch (Exception ex) {
             final ResponseException re = (ResponseException) ex.getCause();
             assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, re.getResponseStatusCode());
+        } finally {
+            cluster.close();
         }
-
-        cluster.close();
     }
 
     @Test
@@ -269,9 +273,9 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         } catch (Exception ex) {
             final ResponseException re = (ResponseException) ex.getCause();
             assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, re.getResponseStatusCode());
+        } finally {
+            cluster.close();
         }
-
-        cluster.close();
     }
 
     @Test
@@ -283,25 +287,27 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
                 maxConnectionPoolSize(1).
                 keepAliveInterval(1000).create();
         final Client client = cluster.connect();
+        try {
 
-        // fire up lots of requests so as to schedule/deschedule lots of ping jobs
-        for (int ix = 0; ix < 500; ix++) {
-            assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
-        }
-
-        // don't send any messages for a bit so that the driver pings in the background
-        Thread.sleep(3000);
+            // fire up lots of requests so as to schedule/deschedule lots of ping jobs
+            for (int ix = 0; ix < 500; ix++) {
+                assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+            }
 
-        // make sure no bonus messages sorta fire off once we get back to sending requests
-        for (int ix = 0; ix < 500; ix++) {
-            assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
-        }
+            // don't send any messages for a bit so that the driver pings in the background
+            Thread.sleep(3000);
 
-        // there really shouldn't be more than 3 of these sent. should definitely be at least one though
-        final long messages = recordingAppender.getMessages().stream().filter(m -> m.contains("Received response from keep-alive request")).count();
-        assertThat(messages, allOf(greaterThan(0L), lessThanOrEqualTo(3L)));
+            // make sure no bonus messages sorta fire off once we get back to sending requests
+            for (int ix = 0; ix < 500; ix++) {
+                assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+            }
 
-        cluster.close();
+            // there really shouldn't be more than 3 of these sent. should definitely be at least one though
+            final long messages = recordingAppender.getMessages().stream().filter(m -> m.contains("Received response from keep-alive request")).count();
+            assertThat(messages, allOf(greaterThan(0L), lessThanOrEqualTo(3L)));
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -312,16 +318,19 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Client client = cluster.connect();
 
         try {
-            client.submit("def x = '';(0..<1024).each{x = x + '$it'};x").all().get();
-            fail("Request should have failed because it exceeded the max content length allowed");
-        } catch (Exception ex) {
-            final Throwable root = ExceptionUtils.getRootCause(ex);
-            assertThat(root.getMessage(), containsString("Max frame length of 1024 has been exceeded."));
-        }
+            try {
+                client.submit("def x = '';(0..<1024).each{x = x + '$it'};x").all().get();
+                fail("Request should have failed because it exceeded the max content length allowed");
+            } catch (Exception ex) {
+                final Throwable root = ExceptionUtils.getRootCause(ex);
+                assertThat(root.getMessage(), containsString("Max frame length of 1024 has been exceeded."));
+            }
 
-        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
+            assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
 
-        cluster.close();
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -329,20 +338,23 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.open();
         final Client client = cluster.connect();
 
-        // tested independently to 10000 iterations but for speed, bumped back to 1000
-        IntStream.range(0,1000).forEach(i -> {
-            try {
-                client.submit("1 + 9 9").all().join().get(0).getInt();
-                fail("Should not have gone through due to syntax error");
-            } catch (Exception ex) {
-                final Throwable root = ExceptionUtils.getRootCause(ex);
-                assertThat(root, instanceOf(ResponseException.class));
-            }
-        });
+        try {
+            // tested independently to 10000 iterations but for speed, bumped back to 1000
+            IntStream.range(0, 1000).forEach(i -> {
+                try {
+                    client.submit("1 + 9 9").all().join().get(0).getInt();
+                    fail("Should not have gone through due to syntax error");
+                } catch (Exception ex) {
+                    final Throwable root = ExceptionUtils.getRootCause(ex);
+                    assertThat(root, instanceOf(ResponseException.class));
+                }
+            });
 
-        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
+            assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
 
-        cluster.close();
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -353,30 +365,33 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Client client = cluster.connect();
 
         try {
-            client.submit("1+1").all().join().get(0).getInt();
-            fail("Should not have gone through because the server is not running");
-        } catch (Exception i) {
-            final Throwable root = ExceptionUtils.getRootCause(i);
-            assertThat(root, instanceOf(TimeoutException.class));
-        }
+            try {
+                client.submit("1+1").all().join().get(0).getInt();
+                fail("Should not have gone through because the server is not running");
+            } catch (Exception i) {
+                final Throwable root = ExceptionUtils.getRootCause(i);
+                assertThat(root, instanceOf(TimeoutException.class));
+            }
 
-        startServer();
+            startServer();
 
-        // default reconnect time is 1 second so wait some extra time to be sure it has time to try to bring it
-        // back to life. usually this passes on the first attempt, but docker is sometimes slow and we get failures
-        // waiting for Gremlin Server to pop back up
-        for (int ix = 3; ix < 13; ix++) {
-            TimeUnit.SECONDS.sleep(ix);
-            try {
-                final int result = client.submit("1+1").all().join().get(0).getInt();
-                assertEquals(2, result);
-                break;
-            } catch (Exception ignored) {
-                logger.warn("Attempt {} failed on shouldEventuallySucceedOnSameServerWithDefault", ix);
+            // default reconnect time is 1 second so wait some extra time to be sure it has time to try to bring it
+            // back to life. usually this passes on the first attempt, but docker is sometimes slow and we get failures
+            // waiting for Gremlin Server to pop back up
+            for (int ix = 3; ix < 13; ix++) {
+                TimeUnit.SECONDS.sleep(ix);
+                try {
+                    final int result = client.submit("1+1").all().join().get(0).getInt();
+                    assertEquals(2, result);
+                    break;
+                } catch (Exception ignored) {
+                    logger.warn("Attempt {} failed on shouldEventuallySucceedOnSameServerWithDefault", ix);
+                }
             }
-        }
 
-        cluster.close();
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -417,16 +432,19 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
     public void shouldEventuallySucceedWithRoundRobin() throws Exception {
         final String noGremlinServer = "74.125.225.19";
         final Cluster cluster = TestClientFactory.build().addContactPoint(noGremlinServer).create();
-        final Client client = cluster.connect();
 
-        // the first host is dead on init.  request should succeed on localhost
-        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
-        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
-        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
-        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
-        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
-
-        cluster.close();
+        try {
+            final Client client = cluster.connect();
+
+            // the first host is dead on init.  request should succeed on localhost
+            assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
+            assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
+            assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
+            assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
+            assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -434,38 +452,41 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.open();
         final Client client = cluster.connect();
 
-        final String script = "g.V().drop().iterate();\n" +
-                "\n" +
-                "List ids = new ArrayList();\n" +
-                "\n" +
-                "int ii = 0;\n" +
-                "Vertex v = graph.addVertex();\n" +
-                "v.property(\"ii\", ii);\n" +
-                "v.property(\"sin\", Math.sin(ii));\n" +
-                "ids.add(v.id());\n" +
-                "\n" +
-                "Random rand = new Random();\n" +
-                "for (; ii < size; ii++) {\n" +
-                "    v = graph.addVertex();\n" +
-                "    v.property(\"ii\", ii);\n" +
-                "    v.property(\"sin\", Math.sin(ii/5.0));\n" +
-                "    Vertex u = graph.vertices(ids.get(rand.nextInt(ids.size()))).next();\n" +
-                "    v.addEdge(\"linked\", u);\n" +
-                "    ids.add(u.id());\n" +
-                "    ids.add(v.id());\n" +
-                "}\n" +
-                "g.V()";
-
-        final List<Integer> sizes = Arrays.asList(1, 10, 20, 50, 75, 100, 250, 500, 750, 1000, 5000, 10000);
-        for (Integer size : sizes) {
-            final Map<String, Object> params = new HashMap<>();
-            params.put("size", size - 1);
-            final ResultSet results = client.submit(script, params);
-
-            assertEquals(size.intValue(), results.all().get().size());
-        }
+        try {
 
-        cluster.close();
+            final String script = "g.V().drop().iterate();\n" +
+                    "\n" +
+                    "List ids = new ArrayList();\n" +
+                    "\n" +
+                    "int ii = 0;\n" +
+                    "Vertex v = graph.addVertex();\n" +
+                    "v.property(\"ii\", ii);\n" +
+                    "v.property(\"sin\", Math.sin(ii));\n" +
+                    "ids.add(v.id());\n" +
+                    "\n" +
+                    "Random rand = new Random();\n" +
+                    "for (; ii < size; ii++) {\n" +
+                    "    v = graph.addVertex();\n" +
+                    "    v.property(\"ii\", ii);\n" +
+                    "    v.property(\"sin\", Math.sin(ii/5.0));\n" +
+                    "    Vertex u = graph.vertices(ids.get(rand.nextInt(ids.size()))).next();\n" +
+                    "    v.addEdge(\"linked\", u);\n" +
+                    "    ids.add(u.id());\n" +
+                    "    ids.add(v.id());\n" +
+                    "}\n" +
+                    "g.V()";
+
+            final List<Integer> sizes = Arrays.asList(1, 10, 20, 50, 75, 100, 250, 500, 750, 1000, 5000, 10000);
+            for (Integer size : sizes) {
+                final Map<String, Object> params = new HashMap<>();
+                params.put("size", size - 1);
+                final ResultSet results = client.submit(script, params);
+
+                assertEquals(size.intValue(), results.all().get().size());
+            }
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -473,126 +494,141 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.open();
         final Client client = cluster.connect();
 
-        final ResultSet results = client.submit("java.awt.Color.RED");
-
         try {
-            results.all().join();
-            fail("Should have thrown exception over bad serialization");
-        } catch (Exception ex) {
-            final Throwable inner = ExceptionUtils.getRootCause(ex);
-            assertThat(inner, instanceOf(ResponseException.class));
-            assertEquals(ResponseStatusCode.SERVER_ERROR_SERIALIZATION, ((ResponseException) inner).getResponseStatusCode());
-        }
 
-        // should not die completely just because we had a bad serialization error.  that kind of stuff happens
-        // from time to time, especially in the console if you're just exploring.
-        assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+            final ResultSet results = client.submit("java.awt.Color.RED");
 
-        cluster.close();
+            try {
+                results.all().join();
+                fail("Should have thrown exception over bad serialization");
+            } catch (Exception ex) {
+                final Throwable inner = ExceptionUtils.getRootCause(ex);
+                assertThat(inner, instanceOf(ResponseException.class));
+                assertEquals(ResponseStatusCode.SERVER_ERROR_SERIALIZATION, ((ResponseException) inner).getResponseStatusCode());
+            }
+
+            // should not die completely just because we had a bad serialization error.  that kind of stuff happens
+            // from time to time, especially in the console if you're just exploring.
+            assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
     public void shouldFailWithScriptExecutionException() throws Exception {
         final Cluster cluster = TestClientFactory.open();
         final Client client = cluster.connect();
-
-        final ResultSet results = client.submit("1/0");
-
         try {
-            results.all().join();
-            fail("Should have thrown exception over bad serialization");
-        } catch (Exception ex) {
-            final Throwable inner = ExceptionUtils.getRootCause(ex);
-            assertTrue(inner instanceof ResponseException);
-            assertThat(inner.getMessage(), endsWith("Division by zero"));
-
-            final ResponseException rex = (ResponseException) inner;
-            assertEquals("java.lang.ArithmeticException", rex.getRemoteExceptionHierarchy().get().get(0));
-            assertEquals(1, rex.getRemoteExceptionHierarchy().get().size());
-            assertThat(rex.getRemoteStackTrace().get(), startsWith("java.lang.ArithmeticException: Division by zero\n\tat java.math.BigDecimal.divide(BigDecimal.java"));
-        }
+            final ResultSet results = client.submit("1/0");
 
-        // should not die completely just because we had a bad serialization error.  that kind of stuff happens
-        // from time to time, especially in the console if you're just exploring.
-        assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+            try {
+                results.all().join();
+                fail("Should have thrown exception over bad serialization");
+            } catch (Exception ex) {
+                final Throwable inner = ExceptionUtils.getRootCause(ex);
+                assertTrue(inner instanceof ResponseException);
+                assertThat(inner.getMessage(), endsWith("Division by zero"));
+
+                final ResponseException rex = (ResponseException) inner;
+                assertEquals("java.lang.ArithmeticException", rex.getRemoteExceptionHierarchy().get().get(0));
+                assertEquals(1, rex.getRemoteExceptionHierarchy().get().size());
+                assertThat(rex.getRemoteStackTrace().get(), startsWith("java.lang.ArithmeticException: Division by zero\n\tat java.math.BigDecimal.divide(BigDecimal.java"));
+            }
 
-        cluster.close();
+            // should not die completely just because we had a bad serialization error.  that kind of stuff happens
+            // from time to time, especially in the console if you're just exploring.
+            assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
     public void shouldProcessRequestsOutOfOrder() throws Exception {
         final Cluster cluster = TestClientFactory.open();
-        final Client client = cluster.connect();
+        try {
+            final Client client = cluster.connect();
 
-        final ResultSet rsFive = client.submit("Thread.sleep(5000);'five'");
-        final ResultSet rsZero = client.submit("'zero'");
+            final ResultSet rsFive = client.submit("Thread.sleep(5000);'five'");
+            final ResultSet rsZero = client.submit("'zero'");
 
-        final CompletableFuture<List<Result>> futureFive = rsFive.all();
-        final CompletableFuture<List<Result>> futureZero = rsZero.all();
+            final CompletableFuture<List<Result>> futureFive = rsFive.all();
+            final CompletableFuture<List<Result>> futureZero = rsZero.all();
 
-        final long start = System.nanoTime();
-        assertFalse(futureFive.isDone());
-        assertEquals("zero", futureZero.get().get(0).getString());
+            final long start = System.nanoTime();
+            assertFalse(futureFive.isDone());
+            assertEquals("zero", futureZero.get().get(0).getString());
 
-        logger.info("Eval of 'zero' complete: " + TimeUtil.millisSince(start));
+            logger.info("Eval of 'zero' complete: " + TimeUtil.millisSince(start));
 
-        assertFalse(futureFive.isDone());
-        assertEquals("five", futureFive.get(10, TimeUnit.SECONDS).get(0).getString());
+            assertFalse(futureFive.isDone());
+            assertEquals("five", futureFive.get(10, TimeUnit.SECONDS).get(0).getString());
 
-        logger.info("Eval of 'five' complete: " + TimeUtil.millisSince(start));
+            logger.info("Eval of 'five' complete: " + TimeUtil.millisSince(start));
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
     public void shouldProcessSessionRequestsInOrder() throws Exception {
         final Cluster cluster = TestClientFactory.open();
-        final Client client = cluster.connect(name.getMethodName());
+        try {
+            final Client client = cluster.connect(name.getMethodName());
 
-        final ResultSet rsFive = client.submit("Thread.sleep(5000);'five'");
-        final ResultSet rsZero = client.submit("'zero'");
+            final ResultSet rsFive = client.submit("Thread.sleep(5000);'five'");
+            final ResultSet rsZero = client.submit("'zero'");
 
-        final CompletableFuture<List<Result>> futureFive = rsFive.all();
-        final CompletableFuture<List<Result>> futureZero = rsZero.all();
+            final CompletableFuture<List<Result>> futureFive = rsFive.all();
+            final CompletableFuture<List<Result>> futureZero = rsZero.all();
 
-        final CountDownLatch latch = new CountDownLatch(2);
-        final List<String> order = new ArrayList<>();
-        final ExecutorService executor = Executors.newSingleThreadExecutor();
+            final CountDownLatch latch = new CountDownLatch(2);
+            final List<String> order = new ArrayList<>();
+            final ExecutorService executor = Executors.newSingleThreadExecutor();
 
-        futureFive.thenAcceptAsync(r -> {
-            order.add(r.get(0).getString());
-            latch.countDown();
-        }, executor);
+            futureFive.thenAcceptAsync(r -> {
+                order.add(r.get(0).getString());
+                latch.countDown();
+            }, executor);
 
-        futureZero.thenAcceptAsync(r -> {
-            order.add(r.get(0).getString());
-            latch.countDown();
-        }, executor);
+            futureZero.thenAcceptAsync(r -> {
+                order.add(r.get(0).getString());
+                latch.countDown();
+            }, executor);
 
-        // wait for both results
-        latch.await(30000, TimeUnit.MILLISECONDS);
+            // wait for both results
+            latch.await(30000, TimeUnit.MILLISECONDS);
 
-        // should be two results
-        assertEquals(2, order.size());
+            // should be two results
+            assertEquals(2, order.size());
 
-        // ensure that "five" is first then "zero"
-        assertThat(order, contains("five", "zero"));
+            // ensure that "five" is first then "zero"
+            assertThat(order, contains("five", "zero"));
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
     public void shouldWaitForAllResultsToArrive() throws Exception {
         final Cluster cluster = TestClientFactory.open();
-        final Client client = cluster.connect();
+        try {
+            final Client client = cluster.connect();
+
+            final AtomicInteger checked = new AtomicInteger(0);
+            final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
+            while (!results.allItemsAvailable()) {
+                assertTrue(results.getAvailableItemCount() < 10);
+                checked.incrementAndGet();
+                Thread.sleep(100);
+            }
 
-        final AtomicInteger checked = new AtomicInteger(0);
-        final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
-        while (!results.allItemsAvailable()) {
-            assertTrue(results.getAvailableItemCount() < 10);
-            checked.incrementAndGet();
-            Thread.sleep(100);
+            assertTrue(checked.get() > 0);
+            assertEquals(9, results.getAvailableItemCount());
+        } finally {
+            cluster.close();
         }
-
-        assertTrue(checked.get() > 0);
-        assertEquals(9, results.getAvailableItemCount());
-        cluster.close();
     }
 
     @Test
@@ -600,17 +636,20 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.build().channelizer(Channelizer.NioChannelizer.class.getName()).create();
         final Client client = cluster.connect();
 
-        final AtomicInteger checked = new AtomicInteger(0);
-        final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
-        while (!results.allItemsAvailable()) {
-            assertTrue(results.getAvailableItemCount() < 10);
-            checked.incrementAndGet();
-            Thread.sleep(100);
-        }
+        try {
+            final AtomicInteger checked = new AtomicInteger(0);
+            final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
+            while (!results.allItemsAvailable()) {
+                assertTrue(results.getAvailableItemCount() < 10);
+                checked.incrementAndGet();
+                Thread.sleep(100);
+            }
 
-        assertTrue(checked.get() > 0);
-        assertEquals(9, results.getAvailableItemCount());
-        cluster.close();
+            assertTrue(checked.get() > 0);
+            assertEquals(9, results.getAvailableItemCount());
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -618,16 +657,18 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.open();
         final Client client = cluster.connect();
 
-        final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
-        final AtomicInteger counter = new AtomicInteger(0);
-        results.stream().map(i -> i.get(Integer.class) * 2).forEach(i -> assertEquals(counter.incrementAndGet() * 2, Integer.parseInt(i.toString())));
-        assertEquals(9, counter.get());
-        assertThat(results.allItemsAvailable(), is(true));
-
-        // cant stream it again
-        assertThat(results.stream().iterator().hasNext(), is(false));
-
-        cluster.close();
+        try {
+            final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
+            final AtomicInteger counter = new AtomicInteger(0);
+            results.stream().map(i -> i.get(Integer.class) * 2).forEach(i -> assertEquals(counter.incrementAndGet() * 2, Integer.parseInt(i.toString())));
+            assertEquals(9, counter.get());
+            assertThat(results.allItemsAvailable(), is(true));
+
+            // cant stream it again
+            assertThat(results.stream().iterator().hasNext(), is(false));
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -635,21 +676,23 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.open();
         final Client client = cluster.connect();
 
-        final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
-        final Iterator<Result> itty = results.iterator();
-        final AtomicInteger counter = new AtomicInteger(0);
-        while (itty.hasNext()) {
-            counter.incrementAndGet();
-            assertEquals(counter.get(), itty.next().getInt());
-        }
-
-        assertEquals(9, counter.get());
-        assertThat(results.allItemsAvailable(), is(true));
+        try {
+            final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
+            final Iterator<Result> itty = results.iterator();
+            final AtomicInteger counter = new AtomicInteger(0);
+            while (itty.hasNext()) {
+                counter.incrementAndGet();
+                assertEquals(counter.get(), itty.next().getInt());
+            }
 
-        // can't stream it again
-        assertThat(results.iterator().hasNext(), is(false));
+            assertEquals(9, counter.get());
+            assertThat(results.allItemsAvailable(), is(true));
 
-        cluster.close();
+            // can't stream it again
+            assertThat(results.iterator().hasNext(), is(false));
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -657,27 +700,29 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.open();
         final Client client = cluster.connect();
 
-        final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
-        final CompletableFuture<List<Result>> batch1 = results.some(5);
-        final CompletableFuture<List<Result>> batch2 = results.some(5);
-        final CompletableFuture<List<Result>> batchNothingLeft = results.some(5);
-
-        assertEquals(5, batch1.get().size());
-        assertEquals(1, batch1.get().get(0).getInt());
-        assertEquals(2, batch1.get().get(1).getInt());
-        assertEquals(3, batch1.get().get(2).getInt());
-        assertEquals(4, batch1.get().get(3).getInt());
-        assertEquals(5, batch1.get().get(4).getInt());
-
-        assertEquals(4, batch2.get().size());
-        assertEquals(6, batch2.get().get(0).getInt());
-        assertEquals(7, batch2.get().get(1).getInt());
-        assertEquals(8, batch2.get().get(2).getInt());
-        assertEquals(9, batch2.get().get(3).getInt());
-
-        assertEquals(0, batchNothingLeft.get().size());
-
-        cluster.close();
+        try {
+            final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
+            final CompletableFuture<List<Result>> batch1 = results.some(5);
+            final CompletableFuture<List<Result>> batch2 = results.some(5);
+            final CompletableFuture<List<Result>> batchNothingLeft = results.some(5);
+
+            assertEquals(5, batch1.get().size());
+            assertEquals(1, batch1.get().get(0).getInt());
+            assertEquals(2, batch1.get().get(1).getInt());
+            assertEquals(3, batch1.get().get(2).getInt());
+            assertEquals(4, batch1.get().get(3).getInt());
+            assertEquals(5, batch1.get().get(4).getInt());
+
+            assertEquals(4, batch2.get().size());
+            assertEquals(6, batch2.get().get(0).getInt());
+            assertEquals(7, batch2.get().get(1).getInt());
+            assertEquals(8, batch2.get().get(2).getInt());
+            assertEquals(9, batch2.get().get(3).getInt());
+
+            assertEquals(0, batchNothingLeft.get().size());
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -685,29 +730,31 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.open();
         final Client client = cluster.connect();
 
-        final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
-        final Result one = results.one();
-        final CompletableFuture<List<Result>> batch1 = results.some(4);
-        final CompletableFuture<List<Result>> batch2 = results.some(5);
-        final CompletableFuture<List<Result>> batchNothingLeft = results.some(5);
-
-        assertEquals(1, one.getInt());
-
-        assertEquals(4, batch1.get().size());
-        assertEquals(2, batch1.get().get(0).getInt());
-        assertEquals(3, batch1.get().get(1).getInt());
-        assertEquals(4, batch1.get().get(2).getInt());
-        assertEquals(5, batch1.get().get(3).getInt());
-
-        assertEquals(4, batch2.get().size());
-        assertEquals(6, batch2.get().get(0).getInt());
-        assertEquals(7, batch2.get().get(1).getInt());
-        assertEquals(8, batch2.get().get(2).getInt());
-        assertEquals(9, batch2.get().get(3).getInt());
-
-        assertEquals(0, batchNothingLeft.get().size());
-
-        cluster.close();
+        try {
+            final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
+            final Result one = results.one();
+            final CompletableFuture<List<Result>> batch1 = results.some(4);
+            final CompletableFuture<List<Result>> batch2 = results.some(5);
+            final CompletableFuture<List<Result>> batchNothingLeft = results.some(5);
+
+            assertEquals(1, one.getInt());
+
+            assertEquals(4, batch1.get().size());
+            assertEquals(2, batch1.get().get(0).getInt());
+            assertEquals(3, batch1.get().get(1).getInt());
+            assertEquals(4, batch1.get().get(2).getInt());
+            assertEquals(5, batch1.get().get(3).getInt());
+
+            assertEquals(4, batch2.get().size());
+            assertEquals(6, batch2.get().get(0).getInt());
+            assertEquals(7, batch2.get().get(1).getInt());
+            assertEquals(8, batch2.get().get(2).getInt());
+            assertEquals(9, batch2.get().get(3).getInt());
+
+            assertEquals(0, batchNothingLeft.get().size());
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -734,70 +781,80 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
                 .workerPoolSize(workerPoolSizeForDriver)
                 .maxWaitForConnection(300000)
                 .create();
-        final Client client = cluster.connect();
-
-        final CountDownLatch latch = new CountDownLatch(requests);
-        final AtomicReference[] refs = new AtomicReference[requests];
-        IntStream.range(0, requests).forEach(ix -> {
-            refs[ix] = new AtomicReference();
-            client.submitAsync("Thread.sleep(5000);[1,2,3,4,5,6,7,8,9]").thenAccept(rs ->
-                    rs.all().thenAccept(refs[ix]::set).thenRun(latch::countDown));
-        });
-
-        // countdown should have reached zero as results should have eventually been all returned and processed
-        assertTrue(latch.await(30, TimeUnit.SECONDS));
-
-        final List<Integer> expected = IntStream.range(1, 10).boxed().collect(Collectors.toList());
-        IntStream.range(0, requests).forEach(r ->
-                assertTrue(expected.containsAll(((List<Result>) refs[r].get()).stream().map(resultItem -> new Integer(resultItem.getInt())).collect(Collectors.toList()))));
+        try {
+            final Client client = cluster.connect();
+
+            final CountDownLatch latch = new CountDownLatch(requests);
+            final AtomicReference[] refs = new AtomicReference[requests];
+            IntStream.range(0, requests).forEach(ix -> {
+                refs[ix] = new AtomicReference();
+                client.submitAsync("Thread.sleep(5000);[1,2,3,4,5,6,7,8,9]").thenAccept(rs ->
+                        rs.all().thenAccept(refs[ix]::set).thenRun(latch::countDown));
+            });
+
+            // countdown should have reached zero as results should have eventually been all returned and processed
+            assertTrue(latch.await(30, TimeUnit.SECONDS));
+
+            final List<Integer> expected = IntStream.range(1, 10).boxed().collect(Collectors.toList());
+            IntStream.range(0, requests).forEach(r ->
+                    assertTrue(expected.containsAll(((List<Result>) refs[r].get()).stream().map(resultItem -> new Integer(resultItem.getInt())).collect(Collectors.toList()))));
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
     public void shouldCloseWithServerDown() throws Exception {
         final Cluster cluster = TestClientFactory.open();
-        cluster.connect().init();
-
-        stopServer();
+        try {
+            cluster.connect().init();
 
-        cluster.close();
+            stopServer();
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
     public void shouldMarkHostDeadSinceServerIsDown() throws Exception {
         final Cluster cluster = TestClientFactory.open();
-        assertEquals(0, cluster.availableHosts().size());
-        cluster.connect().init();
-        assertEquals(1, cluster.availableHosts().size());
-
-        stopServer();
+        try {
+            assertEquals(0, cluster.availableHosts().size());
+            cluster.connect().init();
+            assertEquals(1, cluster.availableHosts().size());
 
-        cluster.connect().init();
-        assertEquals(0, cluster.availableHosts().size());
+            stopServer();
 
-        cluster.close();
+            cluster.connect().init();
+            assertEquals(0, cluster.availableHosts().size());
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
     public void shouldFailWithBadServerSideSerialization() throws Exception {
         final Cluster cluster = TestClientFactory.open();
         final Client client = cluster.connect();
-
-        final ResultSet results = client.submit("TinkerGraph.open().variables()");
-
         try {
-            results.all().join();
-            fail();
-        } catch (Exception ex) {
-            final Throwable inner = ExceptionUtils.getRootCause(ex);
-            assertTrue(inner instanceof ResponseException);
-            assertEquals(ResponseStatusCode.SERVER_ERROR_SERIALIZATION, ((ResponseException) inner).getResponseStatusCode());
-        }
 
-        // should not die completely just because we had a bad serialization error.  that kind of stuff happens
-        // from time to time, especially in the console if you're just exploring.
-        assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+            final ResultSet results = client.submit("TinkerGraph.open().variables()");
 
-        cluster.close();
+            try {
+                results.all().join();
+                fail();
+            } catch (Exception ex) {
+                final Throwable inner = ExceptionUtils.getRootCause(ex);
+                assertTrue(inner instanceof ResponseException);
+                assertEquals(ResponseStatusCode.SERVER_ERROR_SERIALIZATION, ((ResponseException) inner).getResponseStatusCode());
+            }
+
+            // should not die completely just because we had a bad serialization error.  that kind of stuff happens
+            // from time to time, especially in the console if you're just exploring.
+            assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -810,12 +867,14 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.build().serializer(serializer).create();
         final Client client = cluster.connect();
 
-        final ResultSet resultSet = client.submit("TinkerFactory.createClassic()");
-        final List<Result> results = resultSet.all().join();
-        assertEquals(1, results.size());
-        assertEquals("tinkergraph[vertices:6 edges:6]", results.get(0).getString());
-
-        cluster.close();
+        try {
+            final ResultSet resultSet = client.submit("TinkerFactory.createClassic()");
+            final List<Result> results = resultSet.all().join();
+            assertEquals(1, results.size());
+            assertEquals("tinkergraph[vertices:6 edges:6]", results.get(0).getString());
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -828,12 +887,14 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.build().serializer(serializer).create();
         final Client client = cluster.connect();
 
-        final ResultSet resultSet = client.submit("TinkerFactory.createClassic()");
-        final List<Result> results = resultSet.all().join();
-        assertEquals(1, results.size());
-        assertEquals("tinkergraph[vertices:6 edges:6]", results.get(0).getString());
-
-        cluster.close();
+        try {
+            final ResultSet resultSet = client.submit("TinkerFactory.createClassic()");
+            final List<Result> results = resultSet.all().join();
+            assertEquals(1, results.size());
+            assertEquals("tinkergraph[vertices:6 edges:6]", results.get(0).getString());
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -846,9 +907,12 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.build().serializer(serializer).create();
         final Client client = cluster.connect();
 
-        final List<Result> json = client.submit("b = new groovy.json.JsonBuilder();b.people{person {fname 'stephen'\nlname 'mallette'}};b").all().join();
-        assertEquals("{\"people\":{\"person\":{\"fname\":\"stephen\",\"lname\":\"mallette\"}}}", json.get(0).getString());
-        cluster.close();
+        try {
+            final List<Result> json = client.submit("b = new groovy.json.JsonBuilder();b.people{person {fname 'stephen'\nlname 'mallette'}};b").all().join();
+            assertEquals("{\"people\":{\"person\":{\"fname\":\"stephen\",\"lname\":\"mallette\"}}}", json.get(0).getString());
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -861,9 +925,12 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.build().serializer(serializer).create();
         final Client client = cluster.connect();
 
-        final List<Result> json = client.submit("b = new groovy.json.JsonBuilder();b.people{person {fname 'stephen'\nlname 'mallette'}};b").all().join();
-        assertEquals("{\"people\":{\"person\":{\"fname\":\"stephen\",\"lname\":\"mallette\"}}}", json.get(0).getString());
-        cluster.close();
+        try {
+            final List<Result> json = client.submit("b = new groovy.json.JsonBuilder();b.people{person {fname 'stephen'\nlname 'mallette'}};b").all().join();
+            assertEquals("{\"people\":{\"person\":{\"fname\":\"stephen\",\"lname\":\"mallette\"}}}", json.get(0).getString());
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -871,35 +938,38 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V1D0).create();
         final Client client = cluster.connect();
 
-        final List<Result> r = client.submit("TinkerFactory.createModern().traversal().V(1)").all().join();
-        assertEquals(1, r.size());
+        try {
 
-        final Map<String,Object> m = r.get(0).get(Map.class);
-        assertEquals(4, m.size());
-        assertEquals(1, m.get("id"));
-        assertEquals("person", m.get("label"));
-        assertEquals("vertex", m.get("type"));
+            final List<Result> r = client.submit("TinkerFactory.createModern().traversal().V(1)").all().join();
+            assertEquals(1, r.size());
 
-        final Map<String,Object> properties = (Map<String,Object>) m.get("properties");
-        assertEquals(2, properties.size());
+            final Map<String, Object> m = r.get(0).get(Map.class);
+            assertEquals(4, m.size());
+            assertEquals(1, m.get("id"));
+            assertEquals("person", m.get("label"));
+            assertEquals("vertex", m.get("type"));
 
-        final List<Object> names = (List<Object>) properties.get("name");
-        assertEquals(1, names.size());
+            final Map<String, Object> properties = (Map<String, Object>) m.get("properties");
+            assertEquals(2, properties.size());
 
-        final Map<String,Object> nameProperties = (Map<String,Object>) names.get(0);
-        assertEquals(2, nameProperties.size());
-        assertEquals(0l, nameProperties.get("id"));
-        assertEquals("marko", nameProperties.get("value"));
+            final List<Object> names = (List<Object>) properties.get("name");
+            assertEquals(1, names.size());
 
-        final List<Object> ages = (List<Object>) properties.get("age");
-        assertEquals(1, ages.size());
+            final Map<String, Object> nameProperties = (Map<String, Object>) names.get(0);
+            assertEquals(2, nameProperties.size());
+            assertEquals(0l, nameProperties.get("id"));
+            assertEquals("marko", nameProperties.get("value"));
 
-        final Map<String,Object> ageProperties = (Map<String,Object>) ages.get(0);
-        assertEquals(2, ageProperties.size());
-        assertEquals(1l, ageProperties.get("id"));
-        assertEquals(29, ageProperties.get("value"));
+            final List<Object> ages = (List<Object>) properties.get("age");
+            assertEquals(1, ages.size());
 
-        cluster.close();
+            final Map<String, Object> ageProperties = (Map<String, Object>) ages.get(0);
+            assertEquals(2, ageProperties.size());
+            assertEquals(1l, ageProperties.get("id"));
+            assertEquals(29, ageProperties.get("value"));
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -907,18 +977,20 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V2D0).create();
         final Client client = cluster.connect();
 
-        final List<Result> r = client.submit("TinkerFactory.createModern().traversal().V(1)").all().join();
-        assertEquals(1, r.size());
-
-        final Vertex v = r.get(0).get(DetachedVertex.class);
-        assertEquals(1, v.id());
-        assertEquals("person", v.label());
+        try {
+            final List<Result> r = client.submit("TinkerFactory.createModern().traversal().V(1)").all().join();
+            assertEquals(1, r.size());
 
-        assertEquals(2, IteratorUtils.count(v.properties()));
-        assertEquals("marko", v.value("name"));
-        assertEquals(29, Integer.parseInt(v.value("age").toString()));
+            final Vertex v = r.get(0).get(DetachedVertex.class);
+            assertEquals(1, v.id());
+            assertEquals("person", v.label());
 
-        cluster.close();
+            assertEquals(2, IteratorUtils.count(v.properties()));
+            assertEquals("marko", v.value("name"));
+            assertEquals(29, Integer.parseInt(v.value("age").toString()));
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -926,14 +998,16 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V2D0).create();
         final Client client = cluster.connect();
 
-        final Instant now = Instant.now();
-        final List<Result> r = client.submit("java.time.Instant.ofEpochMilli(" + now.toEpochMilli() + ")").all().join();
-        assertEquals(1, r.size());
-
-        final Instant then = r.get(0).get(Instant.class);
-        assertEquals(now, then);
+        try {
+            final Instant now = Instant.now();
+            final List<Result> r = client.submit("java.time.Instant.ofEpochMilli(" + now.toEpochMilli() + ")").all().join();
+            assertEquals(1, r.size());
 
-        cluster.close();
+            final Instant then = r.get(0).get(Instant.class);
+            assertEquals(now, then);
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -941,18 +1015,20 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V3D0).create();
         final Client client = cluster.connect();
 
-        final List<Result> r = client.submit("TinkerFactory.createModern().traversal().V(1)").all().join();
-        assertEquals(1, r.size());
-
-        final Vertex v = r.get(0).get(DetachedVertex.class);
-        assertEquals(1, v.id());
-        assertEquals("person", v.label());
+        try {
+            final List<Result> r = client.submit("TinkerFactory.createModern().traversal().V(1)").all().join();
+            assertEquals(1, r.size());
 
-        assertEquals(2, IteratorUtils.count(v.properties()));
-        assertEquals("marko", v.value("name"));
-        assertEquals(29, Integer.parseInt(v.value("age").toString()));
+            final Vertex v = r.get(0).get(DetachedVertex.class);
+            assertEquals(1, v.id());
+            assertEquals("person", v.label());
 
-        cluster.close();
+            assertEquals(2, IteratorUtils.count(v.properties()));
+            assertEquals("marko", v.value("name"));
+            assertEquals(29, Integer.parseInt(v.value("age").toString()));
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -960,14 +1036,16 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V3D0).create();
         final Client client = cluster.connect();
 
-        final Instant now = Instant.now();
-        final List<Result> r = client.submit("java.time.Instant.ofEpochMilli(" + now.toEpochMilli() + ")").all().join();
-        assertEquals(1, r.size());
-
-        final Instant then = r.get(0).get(Instant.class);
-        assertEquals(now, then);
+        try {
+            final Instant now = Instant.now();
+            final List<Result> r = client.submit("java.time.Instant.ofEpochMilli(" + now.toEpochMilli() + ")").all().join();
+            assertEquals(1, r.size());
 
-        cluster.close();
+            final Instant then = r.get(0).get(Instant.class);
+            assertEquals(now, then);
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -975,14 +1053,16 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHBINARY_V1D0).create();
         final Client client = cluster.connect();
 
-        final List<Result> r = client.submit("TinkerFactory.createModern().traversal().V(1)").all().join();
-        assertEquals(1, r.size());
-
-        final Vertex v = r.get(0).get(ReferenceVertex.class);
-        assertEquals(1, v.id());
-        assertEquals("person", v.label());
+        try {
+            final List<Result> r = client.submit("TinkerFactory.createModern().traversal().V(1)").all().join();
+            assertEquals(1, r.size());
 
-        cluster.close();
+            final Vertex v = r.get(0).get(ReferenceVertex.class);
+            assertEquals(1, v.id());
+            assertEquals("person", v.label());
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -1149,26 +1229,29 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
         assumeNeo4jIsPresent();
 
         final Cluster cluster = TestClientFactory.open();
-        final Client sessionClient = cluster.connect(name.getMethodName());
-        final Client sessionlessClient = cluster.connect();
-
-        //open transaction in session, then add vertex and commit
-        sessionClient.submit("graph.tx().open()").all().get();
-        final Vertex vertexBeforeTx = sessionClient.submit("v=graph.addVertex(\"name\",\"stephen\")").all().get().get(0).getVertex();
-        assertEquals("stephen", vertexBeforeTx.values("name").next());
-        sessionClient.submit("graph.tx().commit()").all().get();
+        try {
+            final Client sessionClient = cluster.connect(name.getMethodName());
+            final Client sessionlessClient = cluster.connect();
 
-        // check that session transaction is closed
-        final boolean isOpen = sessionClient.submit("graph.tx().isOpen()").all().get().get(0).getBoolean();
-        assertTrue("Transaction should be closed", !isOpen);
+            //open transaction in session, then add vertex and commit
+            sessionClient.submit("graph.tx().open()").all().get();
+            final Vertex vertexBeforeTx = sessionClient.submit("v=graph.addVertex(\"name\",\"stephen\")").all().get().get(0).getVertex();
+            assertEquals("stephen", vertexBeforeTx.values("name").next());
+            sessionClient.submit("graph.tx().commit()").all().get();
 
-        //run a sessionless read
-        sessionlessClient.submit("graph.traversal().V()").all().get();
+            // check that session transaction is closed
+            final boolean isOpen = sessionClient.submit("graph.tx().isOpen()").all().get().get(0).getBoolean();
+            assertTrue("Transaction should be closed", !isOpen);
 
-        // check that session transaction is still closed
-        final boolean isOpenAfterSessionless = sessionClient.submit("graph.tx().isOpen()").all().get().get(0).getBoolean();
-        assertTrue("Transaction should stil be closed", !isOpenAfterSessionless);
+            //run a sessionless read
+            sessionlessClient.submit("graph.traversal().V()").all().get();
 
+            // check that session transaction is still closed
+            final boolean isOpenAfterSessionless = sessionClient.submit("graph.tx().isOpen()").all().get().get(0).getBoolean();
+            assertTrue("Transaction should stil be closed", !isOpenAfterSessionless);
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -1223,25 +1306,27 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
     @Test
     public void shouldExecuteScriptsInMultipleSession() throws Exception {
         final Cluster cluster = TestClientFactory.open();
-        final Client client1 = cluster.connect(name.getMethodName() + "1");
-        final Client client2 = cluster.connect(name.getMethodName() + "2");
-        final Client client3 = cluster.connect(name.getMethodName() + "3");
-
-        final ResultSet results11 = client1.submit("x = 1");
-        final ResultSet results21 = client2.submit("x = 2");
-        final ResultSet results31 = client3.submit("x = 3");
-        assertEquals(1, results11.all().get().get(0).getInt());
-        assertEquals(2, results21.all().get().get(0).getInt());
-        assertEquals(3, results31.all().get().get(0).getInt());
-
-        final ResultSet results12 = client1.submit("x + 100");
-        final ResultSet results22 = client2.submit("x * 2");
-        final ResultSet results32 = client3.submit("x * 10");
-        assertEquals(101, results12.all().get().get(0).getInt());
-        assertEquals(4, results22.all().get().get(0).getInt());
-        assertEquals(30, results32.all().get().get(0).getInt());
-
-        cluster.close();
+        try {
+            final Client client1 = cluster.connect(name.getMethodName() + "1");
+            final Client client2 = cluster.connect(name.getMethodName() + "2");
+            final Client client3 = cluster.connect(name.getMethodName() + "3");
+
+            final ResultSet results11 = client1.submit("x = 1");
+            final ResultSet results21 = client2.submit("x = 2");
+            final ResultSet results31 = client3.submit("x = 3");
+            assertEquals(1, results11.all().get().get(0).getInt());
+            assertEquals(2, results21.all().get().get(0).getInt());
+            assertEquals(3, results31.all().get().get(0).getInt());
+
+            final ResultSet results12 = client1.submit("x + 100");
+            final ResultSet results22 = client2.submit("x * 2");
+            final ResultSet results32 = client3.submit("x * 10");
+            assertEquals(101, results12.all().get().get(0).getInt());
+            assertEquals(4, results22.all().get().get(0).getInt());
+            assertEquals(30, results32.all().get().get(0).getInt());
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -1391,22 +1476,24 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
     public void shouldAliasTraversalSourceVariables() throws Exception {
         final Cluster cluster = TestClientFactory.open();
         final Client client = cluster.connect();
-
         try {
-            client.submit("g.addV().property('name','stephen')").all().get().get(0).getVertex();
-            fail("Should have tossed an exception because \"g\" is readonly in this context");
-        } catch (Exception ex) {
-            final Throwable root = ExceptionUtils.getRootCause(ex);
-            assertThat(root, instanceOf(ResponseException.class));
-            final ResponseException re = (ResponseException) root;
-            assertEquals(ResponseStatusCode.SERVER_ERROR_SCRIPT_EVALUATION, re.getResponseStatusCode());
-        }
 
-        final Client clientAliased = client.alias("g1");
-        final Vertex v = clientAliased.submit("g.addV().property('name','jason')").all().get().get(0).getVertex();
-        assertEquals("jason", v.value("name"));
+            try {
+                client.submit("g.addV().property('name','stephen')").all().get().get(0).getVertex();
+                fail("Should have tossed an exception because \"g\" is readonly in this context");
+            } catch (Exception ex) {
+                final Throwable root = ExceptionUtils.getRootCause(ex);
+                assertThat(root, instanceOf(ResponseException.class));
+                final ResponseException re = (ResponseException) root;
+                assertEquals(ResponseStatusCode.SERVER_ERROR_SCRIPT_EVALUATION, re.getResponseStatusCode());
+            }
 
-        cluster.close();
+            final Client clientAliased = client.alias("g1");
+            final Vertex v = clientAliased.submit("g.addV().property('name','jason')").all().get().get(0).getVertex();
+            assertEquals("jason", v.value("name"));
+        } finally {
+            cluster.close();
+        }
     }
 
     @Test
@@ -1496,43 +1583,47 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
     @Test
     public void shouldProcessSessionRequestsInOrderAfterTimeout() throws Exception {
         final Cluster cluster = TestClientFactory.open();
-        final Client client = cluster.connect(name.getMethodName());
 
-        for(int index = 0; index < 50; index++)
-        {
-            final CompletableFuture<ResultSet> first = client.submitAsync(
-                    "Object mon1 = 'mon1';\n" +
-                            "synchronized (mon1) {\n" +
-                            "    mon1.wait();\n" +
-                            "} ");
-
-            final CompletableFuture<ResultSet> second = client.submitAsync(
-                    "Object mon2 = 'mon2';\n" +
-                            "synchronized (mon2) {\n" +
-                            "    mon2.wait();\n" +
-                            "}");
-
-            final CompletableFuture<ResultSet> third = client.submitAsync(
-                    "Object mon3 = 'mon3';\n" +
-                            "synchronized (mon3) {\n" +
-                            "    mon3.wait();\n" +
-                            "}");
-
-            final CompletableFuture<ResultSet> fourth = client.submitAsync(
-                    "Object mon4 = 'mon4';\n" +
-                            "synchronized (mon4) {\n" +
-                            "    mon4.wait();\n" +
-                            "}");
-
-            final CompletableFuture<List<Result>> futureFirst = first.get().all();
-            final CompletableFuture<List<Result>> futureSecond = second.get().all();
-            final CompletableFuture<List<Result>> futureThird = third.get().all();
-            final CompletableFuture<List<Result>> futureFourth = fourth.get().all();
-
-            assertFutureTimeout(futureFirst);
-            assertFutureTimeout(futureSecond);
-            assertFutureTimeout(futureThird);
-            assertFutureTimeout(futureFourth);
+        try {
+            final Client client = cluster.connect(name.getMethodName());
+
+            for (int index = 0; index < 50; index++) {
+                final CompletableFuture<ResultSet> first = client.submitAsync(
+                        "Object mon1 = 'mon1';\n" +
+                                "synchronized (mon1) {\n" +
+                                "    mon1.wait();\n" +
+                                "} ");
+
+                final CompletableFuture<ResultSet> second = client.submitAsync(
+                        "Object mon2 = 'mon2';\n" +
+                                "synchronized (mon2) {\n" +
+                                "    mon2.wait();\n" +
+                                "}");
+
+                final CompletableFuture<ResultSet> third = client.submitAsync(
+                        "Object mon3 = 'mon3';\n" +
+                                "synchronized (mon3) {\n" +
+                                "    mon3.wait();\n" +
+                                "}");
+
+                final CompletableFuture<ResultSet> fourth = client.submitAsync(
+                        "Object mon4 = 'mon4';\n" +
+                                "synchronized (mon4) {\n" +
+                                "    mon4.wait();\n" +
+                                "}");
+
+                final CompletableFuture<List<Result>> futureFirst = first.get().all();
+                final CompletableFuture<List<Result>> futureSecond = second.get().all();
+                final CompletableFuture<List<Result>> futureThird = third.get().all();
+                final CompletableFuture<List<Result>> futureFourth = fourth.get().all();
+
+                assertFutureTimeout(futureFirst);
+                assertFutureTimeout(futureSecond);
+                assertFutureTimeout(futureThird);
+                assertFutureTimeout(futureFourth);
+            }
+        } finally {
+            cluster.close();
         }
     }
 
@@ -1643,21 +1734,22 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
     public void shouldSendRequestIdBytecode() {
         final UUID overrideRequestId = UUID.randomUUID();
         final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V3D0).create();
-        final Client client = Mockito.spy(cluster.connect().alias("g"));
-        Mockito.when(client.alias("g")).thenReturn(client);
-        GraphTraversalSource g = AnonymousTraversalSource.traversal().withRemote(DriverRemoteConnection.using(client));
-        g.with(Tokens.REQUEST_ID, overrideRequestId).V().iterate();
-        cluster.close();
-        ArgumentCaptor<RequestOptions> requestOptionsCaptor = ArgumentCaptor.forClass(RequestOptions.class);
-        verify(client).submitAsync(Mockito.any(Bytecode.class), requestOptionsCaptor.capture());
-        RequestOptions requestOptions = requestOptionsCaptor.getValue();
-        assertTrue(requestOptions.getOverrideRequestId().isPresent());
-        assertEquals(overrideRequestId, requestOptions.getOverrideRequestId().get());
+            final Client client = Mockito.spy(cluster.connect().alias("g"));
+            Mockito.when(client.alias("g")).thenReturn(client);
+            GraphTraversalSource g = AnonymousTraversalSource.traversal().withRemote(DriverRemoteConnection.using(client));
+            g.with(Tokens.REQUEST_ID, overrideRequestId).V().iterate();
+            cluster.close();
+            ArgumentCaptor<RequestOptions> requestOptionsCaptor = ArgumentCaptor.forClass(RequestOptions.class);
+            verify(client).submitAsync(Mockito.any(Bytecode.class), requestOptionsCaptor.capture());
+            RequestOptions requestOptions = requestOptionsCaptor.getValue();
+            assertTrue(requestOptions.getOverrideRequestId().isPresent());
+            assertEquals(overrideRequestId, requestOptions.getOverrideRequestId().get());
+
+            ArgumentCaptor<RequestMessage> requestMessageCaptor = ArgumentCaptor.forClass(RequestMessage.class);
+            verify(client).submitAsync(requestMessageCaptor.capture());
+            RequestMessage requestMessage = requestMessageCaptor.getValue();
+            assertEquals(overrideRequestId, requestMessage.getRequestId());
 
-        ArgumentCaptor<RequestMessage> requestMessageCaptor = ArgumentCaptor.forClass(RequestMessage.class);
-        verify(client).submitAsync(requestMessageCaptor.capture());
-        RequestMessage requestMessage = requestMessageCaptor.getValue();
-        assertEquals(overrideRequestId, requestMessage.getRequestId());
     }
 
     private void assertFutureTimeout(final CompletableFuture<List<Result>> futureFirst) {
@@ -1678,5 +1770,6 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
     public void shouldClusterReadFileFromResources() throws Exception {
         final Cluster cluster = Cluster.open(TestClientFactory.RESOURCE_PATH);
         assertTrue(cluster != null);
+        cluster.close();
     }
 }
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java
index 5d34a78..8eb6106 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java
@@ -42,15 +42,10 @@ import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedWriter;
-import java.io.FileWriter;
 import java.util.Base64;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import java.util.stream.Stream;
 
 import static org.apache.log4j.Level.INFO;
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
index b401303..9cf203b 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
@@ -36,6 +36,7 @@ import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -57,7 +58,7 @@ import static org.junit.Assert.fail;
 /**
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
-public class GremlinServerSessionIntegrateTest  extends AbstractGremlinServerIntegrationTest {
+public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerIntegrationTest {
     private Log4jRecordingAppender recordingAppender = null;
     private Level originalLevel;
 
@@ -65,15 +66,27 @@ public class GremlinServerSessionIntegrateTest  extends AbstractGremlinServerInt
     public void setupForEachTest() {
         recordingAppender = new Log4jRecordingAppender();
         final Logger rootLogger = Logger.getRootLogger();
-        rootLogger.addAppender(recordingAppender);
         originalLevel = rootLogger.getLevel();
+        final String nameOfTest = name.getMethodName();
+        switch (nameOfTest) {
+            case "shouldCloseSessionOnceOnRequest":
+                Logger.getRootLogger().setLevel(Level.INFO);
+                break;
+            case "shouldBlockAdditionalRequestsDuringForceClose":
+                Logger.getRootLogger().setLevel(Level.INFO);
+                break;
+            case "shouldHaveTheSessionTimeout":
+                Logger.getRootLogger().setLevel(Level.INFO);
+                break;
+        }
+        rootLogger.addAppender(recordingAppender);
     }
 
     @After
     public void teardownForEachTest() {
         final Logger rootLogger = Logger.getRootLogger();
-        rootLogger.removeAppender(recordingAppender);
         rootLogger.setLevel(originalLevel);
+        rootLogger.removeAppender(recordingAppender);
     }
 
     /**
@@ -91,12 +104,10 @@ public class GremlinServerSessionIntegrateTest  extends AbstractGremlinServerInt
                 processorSettings.config = new HashMap<>();
                 processorSettings.config.put(SessionOpProcessor.CONFIG_SESSION_TIMEOUT, 3000L);
                 settings.processors.add(processorSettings);
-                Logger.getRootLogger().setLevel(Level.INFO);
                 break;
             case "shouldBlockAdditionalRequestsDuringClose":
             case "shouldBlockAdditionalRequestsDuringForceClose":
                 clearNeo4j(settings);
-                Logger.getRootLogger().setLevel(Level.INFO);
                 break;
             case "shouldEnsureSessionBindingsAreThreadSafe":
                 settings.threadPoolWorker = 2;
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/AbstractGremlinServerChannelizerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/AbstractGremlinServerChannelizerIntegrateTest.java
index b2054f8..a7ea488 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/AbstractGremlinServerChannelizerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/AbstractGremlinServerChannelizerIntegrateTest.java
@@ -44,6 +44,7 @@ import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
 import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.Base64;
 
 import static org.junit.Assert.assertEquals;
@@ -131,15 +132,23 @@ abstract class AbstractGremlinServerChannelizerIntegrateTest extends AbstractGre
         CombinedTestClient client =  new CombinedTestClient(getProtocol());
         try {
             client.sendAndAssertUnauthorized("2+2", "stephen", "notpassword");
+        } finally {
             client.close();
-            client = new CombinedTestClient(getProtocol());
+        }
+
+        client = new CombinedTestClient(getProtocol());
+        try {
             client.sendAndAssert("2+2", 4, "stephen", "password");
+        } finally {
+            client.close();
+        }
+
+        client = new CombinedTestClient(getProtocol());
+        try {
             // Expect exception when try again if the server pipeline is correct
             client.sendAndAssertUnauthorized("2+2", "stephen", "notpassword");
+        } finally {
             client.close();
-        } catch (Exception e) {
-            client.close();
-            throw e;
         }
     }
 
@@ -148,15 +157,23 @@ abstract class AbstractGremlinServerChannelizerIntegrateTest extends AbstractGre
         CombinedTestClient client =  new CombinedTestClient(getSecureProtocol());
         try {
             client.sendAndAssertUnauthorized("2+2", "stephen", "incorrect-password");
+        } finally {
             client.close();
-            client = new CombinedTestClient(getSecureProtocol());
+        }
+
+        client = new CombinedTestClient(getSecureProtocol());
+        try {
             client.sendAndAssert("2+2", 4, "stephen", "password");
+        } finally {
+            client.close();
+        }
+
+        client = new CombinedTestClient(getSecureProtocol());
+        try {
             // Expect exception when try again if the server pipeline is correct
             client.sendAndAssertUnauthorized("2+2", "stephen", "incorrect-password");
+        } finally {
             client.close();
-        } catch (Exception e) {
-            client.close();
-            throw e;
         }
     }
 
@@ -228,7 +245,10 @@ abstract class AbstractGremlinServerChannelizerIntegrateTest extends AbstractGre
             sendAndAssert(gremlin, result, null, null);
         }
 
-        public void close() {
+        public void close() throws IOException {
+            if (httpClient != null) {
+                httpClient.close();
+            }
             if (wsCluster != null) {
                 wsCluster.close();
             }