You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2022/11/08 18:44:22 UTC

[tinkerpop] 01/01: TINKERPOP-2813 Removed fast NoHostAvailableException

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch TINKERPOP-2813
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 44b6dfba1bcc9cd048a3265ab0268acd4d328cf8
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Fri Nov 4 15:16:19 2022 -0400

    TINKERPOP-2813 Removed fast NoHostAvailableException
    
    Took a more optimistic approach to determining Host availability which prevents what may be intermittent network/server problems from being interpreted as the Host not being reachable. Takes a faster approach to reconnecting when a host is found to "maybe" be unavailable.
---
 CHANGELOG.asciidoc                                 |   3 +
 .../apache/tinkerpop/gremlin/driver/Client.java    |  27 +++--
 .../tinkerpop/gremlin/driver/Connection.java       |  66 ++++++-----
 .../gremlin/driver/ConnectionFactory.java          |  41 +++++++
 .../tinkerpop/gremlin/driver/ConnectionPool.java   | 123 +++++++++++++++------
 .../org/apache/tinkerpop/gremlin/driver/Host.java  |  18 +++
 .../driver/exception/NoHostAvailableException.java |   8 +-
 .../driver/ClientConnectionIntegrateTest.java      | 108 ++++++++++++++++++
 .../AbstractGremlinServerIntegrationTest.java      |   8 +-
 .../gremlin/server/GremlinDriverIntegrateTest.java |   2 -
 10 files changed, 328 insertions(+), 76 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 0299de7d5e..10b1b14a96 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -40,6 +40,9 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 * Modified `Connection` and `Host` job scheduling in `gremlin-driver` by dividing their work to two different thread pools and sparing work from the primary pool responsible for submitting requests and reading results.
 * Prevented usage of the fork-join pool for `gremlin-driver` job scheduling.
 * Changed `Host` initialization within a `Client` to be parallel again in `gremlin-driver`.
+* Changed mechanism for determining `Host` health which should make the driver more resilient to intermittent network failures.
+* Removed the delay for reconnecting to a potentially unhealthy `Host` only marking it as unavailable after that initial retry fails.
+* Prevented fast `NoHostAvailableException` in favor of more direct exceptions when borrowing connections from the `ConnectionPool`.
 
 ==== Bugs
 
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index bca2932d7a..0226d32e18 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -40,6 +40,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -50,6 +51,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * A {@code Client} is constructed from a {@link Cluster} and represents a way to send messages to Gremlin Server.
@@ -68,6 +70,8 @@ public abstract class Client {
     protected volatile boolean initialized;
     protected final Client.Settings settings;
 
+    private static final Random random = new Random();
+
     Client(final Cluster cluster, final Client.Settings settings) {
         this.cluster = cluster;
         this.settings = settings;
@@ -423,7 +427,7 @@ public abstract class Client {
      */
     public final static class ClusteredClient extends Client {
 
-        protected ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new ConcurrentHashMap<>();
+        ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new ConcurrentHashMap<>();
         private final AtomicReference<CompletableFuture<Void>> closing = new AtomicReference<>(null);
         private Throwable initializationFailure = null;
 
@@ -492,7 +496,11 @@ public abstract class Client {
         protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException {
             final Iterator<Host> possibleHosts;
             if (msg.optionalArgs(Tokens.ARGS_HOST).isPresent()) {
-                // TODO: not sure what should be done if unavailable - select new host and re-submit traversal?
+                // looking at this code about putting the Host on the RequestMessage in light of 3.5.4, not sure
+                // this is being used as intended here. server side usage is to place the channel.remoteAddress
+                // in this token in the status metadata for the response. can't remember why it is being used this
+                // way here exactly. created TINKERPOP-2821 to examine this more carefully to clean this up in a
+                // future version.
                 final Host host = (Host) msg.getArgs().get(Tokens.ARGS_HOST);
                 msg.getArgs().remove(Tokens.ARGS_HOST);
                 possibleHosts = IteratorUtils.of(host);
@@ -500,16 +508,19 @@ public abstract class Client {
                 possibleHosts = this.cluster.loadBalancingStrategy().select(msg);
             }
 
-            // you can get no possible hosts in more than a few situations. perhaps the servers are just all down.
-            // or perhaps the client is not configured properly (disables ssl when ssl is enabled on the server).
-            if (!possibleHosts.hasNext())
-                throwNoHostAvailableException();
-
-            final Host bestHost = possibleHosts.next();
+            // try a random host if none are marked available. maybe it will reconnect in the meantime. better than
+            // going straight to a fast NoHostAvailableException as was the case in versions 3.5.4 and earlier
+            final Host bestHost = possibleHosts.hasNext() ? possibleHosts.next() : chooseRandomHost();
             final ConnectionPool pool = hostConnectionPools.get(bestHost);
             return pool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
         }
 
+        private Host chooseRandomHost() {
+            final List<Host> hosts = new ArrayList<>(cluster.allHosts());
+            final int ix = random.nextInt(hosts.size());
+            return hosts.get(ix);
+        }
+
         /**
          * Initializes the connection pools on all hosts.
          */
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 81dd440891..0dfe0dbab3 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -114,11 +114,8 @@ final class Connection {
             channel = b.connect(uri.getHost(), uri.getPort()).sync().channel();
             channelizer.connected();
 
-            /* Configure behaviour on close of this channel.
-             *
-             * This callback would trigger the workflow to destroy this connection, so that a new request doesn't pick
-             * this closed connection.
-             */
+            // Configure behaviour on close of this channel. This callback would trigger the workflow to destroy this
+            // connection, so that a new request doesn't pick this closed connection.
             final Connection thisConnection = this;
             channel.closeFuture().addListener((ChannelFutureListener) future -> {
                 logger.debug("OnChannelClose callback called for channel {}", channel);
@@ -138,7 +135,7 @@ final class Connection {
 
             logger.info("Created new connection for {}", uri);
         } catch (Exception ex) {
-            throw new ConnectionException(uri, "Could not open " + this.getConnectionInfo(true), ex);
+            throw new ConnectionException(uri, "Could not open " + getConnectionInfo(true), ex);
         }
     }
 
@@ -316,6 +313,9 @@ final class Connection {
                         RequestMessage.build(Tokens.OPS_CLOSE).addArg(Tokens.ARGS_FORCE, forceClose)).create();
 
                 final CompletableFuture<ResultSet> closed = new CompletableFuture<>();
+
+                // TINKERPOP-2822 should investigate this write more carefully to check for sensible behavior
+                // in the event the Channel was not created but we try to send the close message
                 write(closeMessage, closed);
 
                 try {
@@ -336,27 +336,37 @@ final class Connection {
                 }
             }
 
-            channelizer.close(channel);
+            // take a defensive posture here in the event the channelizer didn't get initialized somehow and a
+            // close() on the Connection is still called
+            if (channelizer != null)
+                channelizer.close(channel);
+
+            // seems possible that the channelizer could initialize but fail to produce a channel, so worth checking
+            // null before proceeding here
+            if (channel != null) {
+                final ChannelPromise promise = channel.newPromise();
+                promise.addListener(f -> {
+                    if (f.cause() != null) {
+                        future.completeExceptionally(f.cause());
+                    } else {
+                        if (logger.isDebugEnabled())
+                            logger.debug("{} destroyed successfully.", connectionInfo);
 
-            final ChannelPromise promise = channel.newPromise();
-            promise.addListener(f -> {
-                if (f.cause() != null) {
-                    future.completeExceptionally(f.cause());
-                } else {
-                    if (logger.isDebugEnabled())
-                        logger.debug("{} destroyed successfully.", connectionInfo);
+                        future.complete(null);
+                    }
+                });
 
-                    future.complete(null);
+                // close the netty channel, if not already closed
+                if (!channel.closeFuture().isDone()) {
+                    channel.close(promise);
+                } else {
+                    if (!promise.trySuccess()) {
+                        logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
+                    }
                 }
-            });
-
-            // close the netty channel, if not already closed
-            if (!channel.closeFuture().isDone()) {
-                channel.close(promise);
             } else {
-                if (!promise.trySuccess()) {
-                    logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
-                }
+                logger.debug("Connection {} is shutting down but the channel was not initialized to begin with",
+                        getConnectionInfo());
             }
         }
     }
@@ -365,7 +375,7 @@ final class Connection {
      * Gets a message that describes the state of the connection.
      */
     public String getConnectionInfo() {
-        return getConnectionInfo(true);
+        return this.getConnectionInfo(true);
     }
 
     /**
@@ -375,10 +385,10 @@ final class Connection {
      */
     public String getConnectionInfo(final boolean showHost) {
         return showHost ?
-                String.format("Connection{channel=%s, host=%s, isDead=%s, borrowed=%s, pending=%s}",
-                getChannelId(), pool.host, isDead(), borrowed, pending.size()) :
-                String.format("Connection{channel=%s, isDead=%s, borrowed=%s, pending=%s}",
-                        channel.id().asShortText(), isDead(), borrowed, pending.size());
+                String.format("Connection{channel=%s host=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s}",
+                        getChannelId(), pool.host.toString(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing()) :
+                String.format("Connection{channel=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s}",
+                        getChannelId(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing());
     }
 
     /**
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionFactory.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionFactory.java
new file mode 100644
index 0000000000..0771402c70
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
+
+/**
+ * A factory that is responsible for creating a {@link Connection} instance. The {@link DefaultConnectionFactory}
+ * simply news up a {@code Connection} using its default constructor. This interface is mostly present to help
+ * enable better testing of the driver internals and likely shouldn't be used otherwise.
+ */
+interface ConnectionFactory {
+
+    /**
+     * Create a connection for the specified {@link ConnectionPool}.
+     */
+    public default Connection create(final ConnectionPool pool) throws ConnectionException {
+        return new Connection(pool.host.getHostUri(), pool, pool.settings().maxInProcessPerConnection);
+    }
+
+    /**
+     * Default implementation.
+     */
+    public static class DefaultConnectionFactory implements ConnectionFactory { }
+}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index b1a9632738..4e5f019273 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -67,12 +67,12 @@ final class ConnectionPool {
     private final String poolLabel;
 
     private final AtomicInteger scheduledForCreation = new AtomicInteger();
-
     private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
 
     private volatile int waiter = 0;
     private final Lock waitLock = new ReentrantLock(true);
     private final Condition hasAvailableConnection = waitLock.newCondition();
+    ConnectionFactory connectionFactory;
 
     public ConnectionPool(final Host host, final Client client) {
         this(host, client, Optional.empty(), Optional.empty());
@@ -80,9 +80,15 @@ final class ConnectionPool {
 
     public ConnectionPool(final Host host, final Client client, final Optional<Integer> overrideMinPoolSize,
                           final Optional<Integer> overrideMaxPoolSize) {
+        this(host, client, overrideMinPoolSize, overrideMaxPoolSize, new ConnectionFactory.DefaultConnectionFactory());
+    }
+
+    ConnectionPool(final Host host, final Client client, final Optional<Integer> overrideMinPoolSize,
+                          final Optional<Integer> overrideMaxPoolSize, final ConnectionFactory connectionFactory) {
         this.host = host;
         this.client = client;
         this.cluster = client.cluster;
+        this.connectionFactory = connectionFactory;
         poolLabel = "Connection Pool {host=" + host + "}";
 
         final Settings.ConnectionPoolSettings settings = settings();
@@ -100,7 +106,7 @@ final class ConnectionPool {
             for (int i = 0; i < minPoolSize; i++) {
                 connectionCreationFutures.add(CompletableFuture.runAsync(() -> {
                     try {
-                        this.connections.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection));
+                        this.connections.add(connectionFactory.create(this));
                         this.open.incrementAndGet();
                     } catch (ConnectionException e) {
                         throw new CompletionException(e);
@@ -110,7 +116,7 @@ final class ConnectionPool {
 
             CompletableFuture.allOf(connectionCreationFutures.toArray(new CompletableFuture[0])).join();
         } catch (CancellationException ce) {
-            logger.warn("Initialization of connections cancelled for {}", getPoolInfo(), ce);
+            logger.warn("Initialization of connections cancelled for {}", this.getPoolInfo(), ce);
             throw ce;
         } catch (CompletionException ce) {
             // Some connections might have been initialized. Close the connection pool gracefully to close them.
@@ -312,20 +318,31 @@ final class ConnectionPool {
 
     private void newConnection() {
         cluster.connectionScheduler().submit(() -> {
-            addConnectionIfUnderMaximum();
+            // seems like this should be decremented first because if addConnectionIfUnderMaximum fails there is
+            // nothing that wants to decrement this number and so it leaves things in a state where you could
+            // newConnection() doesn't seem to get called at all because it believes connections are being currently
+            // created. this seems to lead to situations where the client can never borrow a connection and it's as
+            // though it can't reconnect at all. this was hard to test but it seemed to happen regularly after
+            // introduced the ConnectionFactory that enabled a way to introduce connection jitters (i.e. failures in
+            // creation of a Connection) at which point it seemed to happen with some regularity.
             scheduledForCreation.decrementAndGet();
+            addConnectionIfUnderMaximum();
             return null;
         });
     }
 
     private boolean addConnectionIfUnderMaximum() {
+        final int openCountToActOn;
+
         while (true) {
-            int opened = open.get();
+            final int opened = open.get();
             if (opened >= maxPoolSize)
                 return false;
 
-            if (open.compareAndSet(opened, opened + 1))
+            if (open.compareAndSet(opened, opened + 1)) {
+                openCountToActOn = opened;
                 break;
+            }
         }
 
         if (isClosed()) {
@@ -334,10 +351,13 @@ final class ConnectionPool {
         }
 
         try {
-            connections.add(new Connection(host.getHostUri(), this, settings().maxInProcessPerConnection));
-        } catch (ConnectionException ce) {
-            logger.error("Connections were under max, but there was an error creating the connection.", ce);
+            connections.add(connectionFactory.create(this));
+        } catch (Exception ce) {
             open.decrementAndGet();
+            logger.error(String.format(
+                    "Connections[%s] were under maximum allowed[%s], but there was an error creating a new connection",
+                            openCountToActOn, maxPoolSize),
+                    ce);
             considerHostUnavailable();
             return false;
         }
@@ -432,20 +452,33 @@ final class ConnectionPool {
 
         // if we timeout borrowing a connection that might mean the host is dead (or the timeout was super short).
         // either way supply a function to reconnect
+        final TimeoutException timeoutException = new TimeoutException(timeoutErrorMessage);
         this.considerHostUnavailable();
 
-        throw new TimeoutException(timeoutErrorMessage);
+        throw timeoutException;
     }
 
+    /**
+     * On a failure to get a {@link Connection} this method is called to determine if the {@link Host} should be
+     * marked as unavailable and to establish a background reconnect operation.
+     */
     public void considerHostUnavailable() {
-        // called when a connection is "dead" due to a non-recoverable error.
-        host.makeUnavailable(this::tryReconnect);
-
-        // if the host is unavailable then we should release the connections
-        connections.forEach(this::definitelyDestroyConnection);
-
-        // let the load-balancer know that the host is acting poorly
-        this.cluster.loadBalancingStrategy().onUnavailable(host);
+        // if there is at least one available connection the host has to still be around (or is perhaps on its way out
+        // but we'll stay optimistic in this check). no connections also means "unhealthy". unsure if there is an ok
+        // "no connections" state we can get into. in any event if there are no connections then we'd just try to
+        // immediately reconnect below anyway so perhaps that state isn't really something to worry about.
+        final boolean maybeUnhealthy = connections.stream().allMatch(Connection::isDead);
+        if (maybeUnhealthy) {
+            // immediately fire off an attempt to reconnect because there are no active connections.
+            host.tryReconnectingImmediately(this::tryReconnect);
+
+            // let the load-balancer know that the host is acting poorly
+            if (!host.isAvailable()) {
+                // if the host is unavailable then we should release the connections
+                connections.forEach(this::definitelyDestroyConnection);
+                this.cluster.loadBalancingStrategy().onUnavailable(host);
+            }
+        }
     }
 
     /**
@@ -457,7 +490,13 @@ final class ConnectionPool {
 
         Connection connection = null;
         try {
-            connection = borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
+            // rather than rely on borrowConnection() infrastructure and the pool create a brand new Connection
+            // instance solely for the purpose of this ping. this ensures that if the pool is overloaded that we
+            // make an honest attempt at validating host health without failing over some timeout waiting for a
+            // connection in the pool. not sure if we should try to keep this connection if it succeeds and if the
+            // pool needs it. for now that seems like an unnecessary added bit of complexity for dealing with this
+            // error state
+            connection = connectionFactory.create(this);
             final RequestMessage ping = client.buildMessage(cluster.validationRequest()).create();
             final CompletableFuture<ResultSet> f = new CompletableFuture<>();
             connection.write(ping, f);
@@ -467,9 +506,13 @@ final class ConnectionPool {
             this.cluster.loadBalancingStrategy().onAvailable(h);
             return true;
         } catch (Exception ex) {
-            logger.debug("Failed reconnect attempt on {}", h, ex);
-            if (connection != null) definitelyDestroyConnection(connection);
+            logger.error(String.format("Failed reconnect attempt on %s%s%s",
+                            h, System.lineSeparator(), this.getPoolInfo()), ex);
             return false;
+        } finally {
+            if (connection != null) {
+                connection.closeAsync();
+            }
         }
     }
 
@@ -547,7 +590,7 @@ final class ConnectionPool {
      */
     public String getPoolInfo(final Connection connectionToCallout) {
         final StringBuilder sb = new StringBuilder("ConnectionPool (");
-        sb.append(host);
+        sb.append(host.toString());
         sb.append(")");
 
         if (connections.isEmpty()) {
@@ -555,25 +598,35 @@ final class ConnectionPool {
         } else {
             final int connectionCount = connections.size();
             sb.append(System.lineSeparator());
-            sb.append(String.format("Connection Pool Status (size=%s max=%s min=%s)", connectionCount, maxPoolSize, minPoolSize));
+            sb.append(String.format("Connection Pool Status (size=%s max=%s min=%s toCreate=%s markedOpen=%s bin=%s)",
+                    connectionCount, maxPoolSize, minPoolSize, this.scheduledForCreation.get(), this.open.get(), bin.size()));
+            sb.append(System.lineSeparator());
+
+            appendConnections(sb, connectionToCallout, connections);
+            sb.append(System.lineSeparator());
+            sb.append("-- bin --");
             sb.append(System.lineSeparator());
+            appendConnections(sb, connectionToCallout, new ArrayList<>(bin));
+        }
 
-            for (int ix = 0; ix < connectionCount; ix++) {
-                final Connection c = connections.get(ix);
+        return sb.toString().trim();
+    }
 
-                if (c == connectionToCallout)
-                    sb.append("==> ");
-                else
-                    sb.append("> ");
+    private void appendConnections(final StringBuilder sb, final Connection connectionToCallout,
+                                   final List<Connection> connections) {
+        final int connectionCount = connections.size();
+        for (int ix = 0; ix < connectionCount; ix++) {
+            final Connection c = connections.get(ix);
+            if (c.equals(connectionToCallout))
+                sb.append("==> ");
+            else
+                sb.append("> ");
 
-                sb.append(c.getConnectionInfo(false));
+            sb.append(c.getConnectionInfo(false));
 
-                if (ix < connectionCount - 1)
-                    sb.append(System.lineSeparator());
-            }
+            if (ix < connectionCount - 1)
+                sb.append(System.lineSeparator());
         }
-
-        return sb.toString().trim();
     }
 
     @Override
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java
index fb0a9e333a..3d492fa612 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java
@@ -26,6 +26,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 
@@ -85,6 +86,23 @@ public final class Host {
         }
     }
 
+    void tryReconnectingImmediately(final Function<Host, Boolean> reconnect) {
+        // only do a connection re-attempt if one is not already in progress
+        if (retryInProgress.compareAndSet(Boolean.FALSE, Boolean.TRUE)) {
+            retryThread = this.cluster.hostScheduler().scheduleAtFixedRate(() -> {
+                        logger.debug("Trying to reconnect to host at {}", this);
+                        final boolean reconnected = reconnect.apply(this);
+                        if (reconnected)
+                            reconnected();
+                        else {
+                            logger.warn("Marking {} as unavailable. Trying to reconnect.", this);
+                            isAvailable = false;
+                        }
+                    }, 0,
+                    cluster.connectionPoolSettings().reconnectInterval, TimeUnit.MILLISECONDS);
+        }
+    }
+
     private void reconnected() {
         // race condition!  retry boolean could be set to false, a new retryThread created above
         // and then cancelled here.   But we're only executing this at all because we *have* reconnected
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java
index 1d2a70f7ea..9ccea2dffa 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java
@@ -21,10 +21,14 @@ package org.apache.tinkerpop.gremlin.driver.exception;
 public class NoHostAvailableException extends RuntimeException {
 
     public NoHostAvailableException() {
-        super("All hosts are considered unavailable due to previous exceptions. Check the error log to find the actual reason.");
+        this("All hosts are considered unavailable due to previous exceptions. Check the error log to find the actual reason.");
     }
 
-    public NoHostAvailableException(Throwable ex) {
+    public NoHostAvailableException(final String message) {
+        super(message);
+    }
+
+    public NoHostAvailableException(final Throwable ex) {
         super(ex);
     }
 
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
index 8ebad7809e..1417409c47 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
@@ -21,6 +21,8 @@ package org.apache.tinkerpop.gremlin.driver;
 import io.netty.handler.codec.CorruptedFrameException;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
+import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
 import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
 import org.apache.tinkerpop.gremlin.server.AbstractGremlinServerIntegrationTest;
 import org.apache.tinkerpop.gremlin.server.TestClientFactory;
@@ -28,13 +30,25 @@ import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLHandshakeException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.number.OrderingComparison.greaterThan;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrationTest {
+    private static final org.slf4j.Logger logger = LoggerFactory.getLogger(ClientConnectionIntegrateTest.class);
     private Log4jRecordingAppender recordingAppender = null;
     private Level previousLogLevel;
 
@@ -110,4 +124,98 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat
         assertThat(recordingAppender.logContainsAny("^(?!.*(isDead=false)).*isDead=true.*destroyed successfully.$"), is(true));
 
     }
+
+    /**
+     * Added for TINKERPOP-2813 - this scenario would have previously thrown tons of
+     * {@link NoHostAvailableException}.
+     */
+    @Test
+    public void shouldSucceedWithJitteryConnection() throws Exception {
+        final Cluster cluster = TestClientFactory.build().minConnectionPoolSize(1).maxConnectionPoolSize(4).
+                reconnectInterval(1000).
+                maxWaitForConnection(4000).validationRequest("g.inject()").create();
+        final Client.ClusteredClient client = cluster.connect();
+
+        client.init();
+
+        // every 10 connections let's have some problems
+        final JitteryConnectionFactory connectionFactory = new JitteryConnectionFactory(3);
+        client.hostConnectionPools.forEach((h, pool) -> pool.connectionFactory = connectionFactory);
+
+        // get an initial connection which marks the host as available
+        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
+
+        // network is gonna get fishy - ConnectionPool should try to grow during the workload below and when it
+        // does some connections will fail to create in the background which should log some errors but not tank
+        // the submit() as connections that are currently still working and active should be able to handle the load.
+        connectionFactory.jittery = true;
+
+        // load up a hella ton of requests
+        final int requests = 1000;
+        final CountDownLatch latch = new CountDownLatch(requests);
+        final AtomicBoolean hadFailOtherThanTimeout = new AtomicBoolean(false);
+
+        new Thread(() -> {
+            IntStream.range(0, requests).forEach(i -> {
+                try {
+                    client.submitAsync("1 + " + i);
+                } catch (Exception ex) {
+                    // we could catch a TimeoutException here in some cases if the jitters cause a borrow of a
+                    // connection to take too long. submitAsync() will wrap in a RuntimeException. can't assert
+                    // this condition inside this thread or the test locks up
+                    hadFailOtherThanTimeout.compareAndSet(false, !(ex.getCause() instanceof TimeoutException));
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }, "worker-shouldSucceedWithJitteryConnection").start();
+
+        // wait long enough for the jitters to kick in at least a little
+        while (latch.getCount() > 500) {
+            TimeUnit.MILLISECONDS.sleep(50);
+        }
+
+        // wait for requests to complete
+        assertTrue(latch.await(30000, TimeUnit.MILLISECONDS));
+
+        // make sure we had some failures for sure coming out the factory
+        assertThat(connectionFactory.getNumberOfFailures(), is(greaterThan(0L)));
+
+        // if there was a exception in the worker thread, then it had better be a TimeoutException
+        assertThat(hadFailOtherThanTimeout.get(), is(false));
+
+        cluster.close();
+    }
+
+    /**
+     * Introduces random failures when creating a {@link Connection} for the {@link ConnectionPool}.
+     */
+    public static class JitteryConnectionFactory implements ConnectionFactory {
+
+        private volatile boolean jittery = false;
+        private final AtomicLong connectionsCreated = new AtomicLong(0);
+        private final AtomicLong connectionFailures = new AtomicLong(0);
+        private final int numberOfConnectionsBetweenErrors;
+
+        public JitteryConnectionFactory(final int numberOfConnectionsBetweenErrors) {
+            this.numberOfConnectionsBetweenErrors = numberOfConnectionsBetweenErrors;
+        }
+
+        public long getNumberOfFailures() {
+            return connectionFailures.get();
+        }
+
+        @Override
+        public Connection create(final ConnectionPool pool) {
+
+            // fail creating a connection every 10 attempts or so when jittery
+            if (jittery && connectionsCreated.incrementAndGet() % numberOfConnectionsBetweenErrors == 0) {
+                connectionFailures.incrementAndGet();
+                throw new ConnectionException(pool.host.getHostUri(),
+                        new SSLHandshakeException("SSL on the funk - server is big mad"));
+            }
+
+            return ConnectionFactory.super.create(pool);
+        }
+    }
 }
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
index 2b66ce1de1..6a8acbad15 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.server;
 import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
 import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizerIntegrateTest;
 import org.apache.tinkerpop.gremlin.server.op.OpLoader;
+import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
 import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
 import org.junit.After;
 import org.junit.Before;
@@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.InputStream;
+import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assume.assumeThat;
@@ -117,6 +119,10 @@ public abstract class AbstractGremlinServerIntegrationTest {
     }
 
     public void startServer() throws Exception {
+        startServerAsync().join();
+    }
+
+    public CompletableFuture<ServerGremlinExecutor> startServerAsync() throws Exception {
         final InputStream stream = getSettingsInputStream();
         final Settings settings = Settings.read(stream);
         overriddenSettings = overrideSettings(settings);
@@ -131,7 +137,7 @@ public abstract class AbstractGremlinServerIntegrationTest {
 
         this.server = new GremlinServer(overriddenSettings);
 
-        server.start().join();
+        return server.start();
     }
 
     @After
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index e44660ee50..43a53bae7a 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -55,7 +55,6 @@ import org.apache.tinkerpop.gremlin.util.function.FunctionUtils;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
@@ -63,7 +62,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.awt.Color;
-import java.io.File;
 import java.net.ConnectException;
 import java.time.Instant;
 import java.util.ArrayList;