You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2022/11/08 18:44:22 UTC
[tinkerpop] 01/01: TINKERPOP-2813 Removed fast NoHostAvailableException
This is an automated email from the ASF dual-hosted git repository.
spmallette pushed a commit to branch TINKERPOP-2813
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 44b6dfba1bcc9cd048a3265ab0268acd4d328cf8
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Fri Nov 4 15:16:19 2022 -0400
TINKERPOP-2813 Removed fast NoHostAvailableException
Took a more optimistic approach to determining Host availability which prevents what may be intermittent network/server problems from being interpreted as the Host not being reachable. Takes a faster approach to reconnecting when a host is found to "maybe" be unavailable.
---
CHANGELOG.asciidoc | 3 +
.../apache/tinkerpop/gremlin/driver/Client.java | 27 +++--
.../tinkerpop/gremlin/driver/Connection.java | 66 ++++++-----
.../gremlin/driver/ConnectionFactory.java | 41 +++++++
.../tinkerpop/gremlin/driver/ConnectionPool.java | 123 +++++++++++++++------
.../org/apache/tinkerpop/gremlin/driver/Host.java | 18 +++
.../driver/exception/NoHostAvailableException.java | 8 +-
.../driver/ClientConnectionIntegrateTest.java | 108 ++++++++++++++++++
.../AbstractGremlinServerIntegrationTest.java | 8 +-
.../gremlin/server/GremlinDriverIntegrateTest.java | 2 -
10 files changed, 328 insertions(+), 76 deletions(-)
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 0299de7d5e..10b1b14a96 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -40,6 +40,9 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
* Modified `Connection` and `Host` job scheduling in `gremlin-driver` by dividing their work to two different thread pools and sparing work from the primary pool responsible for submitting requests and reading results.
* Prevented usage of the fork-join pool for `gremlin-driver` job scheduling.
* Changed `Host` initialization within a `Client` to be parallel again in `gremlin-driver`.
+* Changed mechanism for determining `Host` health which should make the driver more resilient to intermittent network failures.
+* Removed the delay for reconnecting to a potentially unhealthy `Host` only marking it as unavailable after that initial retry fails.
+* Prevented fast `NoHostAvailableException` in favor of more direct exceptions when borrowing connections from the `ConnectionPool`.
==== Bugs
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index bca2932d7a..0226d32e18 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -40,6 +40,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -50,6 +51,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* A {@code Client} is constructed from a {@link Cluster} and represents a way to send messages to Gremlin Server.
@@ -68,6 +70,8 @@ public abstract class Client {
protected volatile boolean initialized;
protected final Client.Settings settings;
+ private static final Random random = new Random();
+
Client(final Cluster cluster, final Client.Settings settings) {
this.cluster = cluster;
this.settings = settings;
@@ -423,7 +427,7 @@ public abstract class Client {
*/
public final static class ClusteredClient extends Client {
- protected ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new ConcurrentHashMap<>();
+ ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new ConcurrentHashMap<>();
private final AtomicReference<CompletableFuture<Void>> closing = new AtomicReference<>(null);
private Throwable initializationFailure = null;
@@ -492,7 +496,11 @@ public abstract class Client {
protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException {
final Iterator<Host> possibleHosts;
if (msg.optionalArgs(Tokens.ARGS_HOST).isPresent()) {
- // TODO: not sure what should be done if unavailable - select new host and re-submit traversal?
+ // looking at this code about putting the Host on the RequestMessage in light of 3.5.4, not sure
+ // this is being used as intended here. server side usage is to place the channel.remoteAddress
+ // in this token in the status metadata for the response. can't remember why it is being used this
+ // way here exactly. created TINKERPOP-2821 to examine this more carefully to clean this up in a
+ // future version.
final Host host = (Host) msg.getArgs().get(Tokens.ARGS_HOST);
msg.getArgs().remove(Tokens.ARGS_HOST);
possibleHosts = IteratorUtils.of(host);
@@ -500,16 +508,19 @@ public abstract class Client {
possibleHosts = this.cluster.loadBalancingStrategy().select(msg);
}
- // you can get no possible hosts in more than a few situations. perhaps the servers are just all down.
- // or perhaps the client is not configured properly (disables ssl when ssl is enabled on the server).
- if (!possibleHosts.hasNext())
- throwNoHostAvailableException();
-
- final Host bestHost = possibleHosts.next();
+ // try a random host if none are marked available. maybe it will reconnect in the meantime. better than
+ // going straight to a fast NoHostAvailableException as was the case in versions 3.5.4 and earlier
+ final Host bestHost = possibleHosts.hasNext() ? possibleHosts.next() : chooseRandomHost();
final ConnectionPool pool = hostConnectionPools.get(bestHost);
return pool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
}
+ private Host chooseRandomHost() {
+ final List<Host> hosts = new ArrayList<>(cluster.allHosts());
+ final int ix = random.nextInt(hosts.size());
+ return hosts.get(ix);
+ }
+
/**
* Initializes the connection pools on all hosts.
*/
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 81dd440891..0dfe0dbab3 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -114,11 +114,8 @@ final class Connection {
channel = b.connect(uri.getHost(), uri.getPort()).sync().channel();
channelizer.connected();
- /* Configure behaviour on close of this channel.
- *
- * This callback would trigger the workflow to destroy this connection, so that a new request doesn't pick
- * this closed connection.
- */
+ // Configure behaviour on close of this channel. This callback would trigger the workflow to destroy this
+ // connection, so that a new request doesn't pick this closed connection.
final Connection thisConnection = this;
channel.closeFuture().addListener((ChannelFutureListener) future -> {
logger.debug("OnChannelClose callback called for channel {}", channel);
@@ -138,7 +135,7 @@ final class Connection {
logger.info("Created new connection for {}", uri);
} catch (Exception ex) {
- throw new ConnectionException(uri, "Could not open " + this.getConnectionInfo(true), ex);
+ throw new ConnectionException(uri, "Could not open " + getConnectionInfo(true), ex);
}
}
@@ -316,6 +313,9 @@ final class Connection {
RequestMessage.build(Tokens.OPS_CLOSE).addArg(Tokens.ARGS_FORCE, forceClose)).create();
final CompletableFuture<ResultSet> closed = new CompletableFuture<>();
+
+ // TINKERPOP-2822 should investigate this write more carefully to check for sensible behavior
+ // in the event the Channel was not created but we try to send the close message
write(closeMessage, closed);
try {
@@ -336,27 +336,37 @@ final class Connection {
}
}
- channelizer.close(channel);
+ // take a defensive posture here in the event the channelizer didn't get initialized somehow and a
+ // close() on the Connection is still called
+ if (channelizer != null)
+ channelizer.close(channel);
+
+ // seems possible that the channelizer could initialize but fail to produce a channel, so worth checking
+ // null before proceeding here
+ if (channel != null) {
+ final ChannelPromise promise = channel.newPromise();
+ promise.addListener(f -> {
+ if (f.cause() != null) {
+ future.completeExceptionally(f.cause());
+ } else {
+ if (logger.isDebugEnabled())
+ logger.debug("{} destroyed successfully.", connectionInfo);
- final ChannelPromise promise = channel.newPromise();
- promise.addListener(f -> {
- if (f.cause() != null) {
- future.completeExceptionally(f.cause());
- } else {
- if (logger.isDebugEnabled())
- logger.debug("{} destroyed successfully.", connectionInfo);
+ future.complete(null);
+ }
+ });
- future.complete(null);
+ // close the netty channel, if not already closed
+ if (!channel.closeFuture().isDone()) {
+ channel.close(promise);
+ } else {
+ if (!promise.trySuccess()) {
+ logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
+ }
}
- });
-
- // close the netty channel, if not already closed
- if (!channel.closeFuture().isDone()) {
- channel.close(promise);
} else {
- if (!promise.trySuccess()) {
- logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
- }
+ logger.debug("Connection {} is shutting down but the channel was not initialized to begin with",
+ getConnectionInfo());
}
}
}
@@ -365,7 +375,7 @@ final class Connection {
* Gets a message that describes the state of the connection.
*/
public String getConnectionInfo() {
- return getConnectionInfo(true);
+ return this.getConnectionInfo(true);
}
/**
@@ -375,10 +385,10 @@ final class Connection {
*/
public String getConnectionInfo(final boolean showHost) {
return showHost ?
- String.format("Connection{channel=%s, host=%s, isDead=%s, borrowed=%s, pending=%s}",
- getChannelId(), pool.host, isDead(), borrowed, pending.size()) :
- String.format("Connection{channel=%s, isDead=%s, borrowed=%s, pending=%s}",
- channel.id().asShortText(), isDead(), borrowed, pending.size());
+ String.format("Connection{channel=%s host=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s}",
+ getChannelId(), pool.host.toString(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing()) :
+ String.format("Connection{channel=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s}",
+ getChannelId(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing());
}
/**
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionFactory.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionFactory.java
new file mode 100644
index 0000000000..0771402c70
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
+
+/**
+ * A factory that is responsible for creating a {@link Connection} instance. The {@link DefaultConnectionFactory}
+ * simply news up a {@code Connection} using its default constructor. This interface is mostly present to help
+ * enable better testing of the driver internals and likely shouldn't be used otherwise.
+ */
+interface ConnectionFactory {
+
+ /**
+ * Create a connection for the specified {@link ConnectionPool}.
+ */
+ public default Connection create(final ConnectionPool pool) throws ConnectionException {
+ return new Connection(pool.host.getHostUri(), pool, pool.settings().maxInProcessPerConnection);
+ }
+
+ /**
+ * Default implementation.
+ */
+ public static class DefaultConnectionFactory implements ConnectionFactory { }
+}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index b1a9632738..4e5f019273 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -67,12 +67,12 @@ final class ConnectionPool {
private final String poolLabel;
private final AtomicInteger scheduledForCreation = new AtomicInteger();
-
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
private volatile int waiter = 0;
private final Lock waitLock = new ReentrantLock(true);
private final Condition hasAvailableConnection = waitLock.newCondition();
+ ConnectionFactory connectionFactory;
public ConnectionPool(final Host host, final Client client) {
this(host, client, Optional.empty(), Optional.empty());
@@ -80,9 +80,15 @@ final class ConnectionPool {
public ConnectionPool(final Host host, final Client client, final Optional<Integer> overrideMinPoolSize,
final Optional<Integer> overrideMaxPoolSize) {
+ this(host, client, overrideMinPoolSize, overrideMaxPoolSize, new ConnectionFactory.DefaultConnectionFactory());
+ }
+
+ ConnectionPool(final Host host, final Client client, final Optional<Integer> overrideMinPoolSize,
+ final Optional<Integer> overrideMaxPoolSize, final ConnectionFactory connectionFactory) {
this.host = host;
this.client = client;
this.cluster = client.cluster;
+ this.connectionFactory = connectionFactory;
poolLabel = "Connection Pool {host=" + host + "}";
final Settings.ConnectionPoolSettings settings = settings();
@@ -100,7 +106,7 @@ final class ConnectionPool {
for (int i = 0; i < minPoolSize; i++) {
connectionCreationFutures.add(CompletableFuture.runAsync(() -> {
try {
- this.connections.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection));
+ this.connections.add(connectionFactory.create(this));
this.open.incrementAndGet();
} catch (ConnectionException e) {
throw new CompletionException(e);
@@ -110,7 +116,7 @@ final class ConnectionPool {
CompletableFuture.allOf(connectionCreationFutures.toArray(new CompletableFuture[0])).join();
} catch (CancellationException ce) {
- logger.warn("Initialization of connections cancelled for {}", getPoolInfo(), ce);
+ logger.warn("Initialization of connections cancelled for {}", this.getPoolInfo(), ce);
throw ce;
} catch (CompletionException ce) {
// Some connections might have been initialized. Close the connection pool gracefully to close them.
@@ -312,20 +318,31 @@ final class ConnectionPool {
private void newConnection() {
cluster.connectionScheduler().submit(() -> {
- addConnectionIfUnderMaximum();
+ // seems like this should be decremented first because if addConnectionIfUnderMaximum fails there is
+ // nothing that wants to decrement this number and so it leaves things in a state where you could
+ // newConnection() doesn't seem to get called at all because it believes connections are being currently
+ // created. this seems to lead to situations where the client can never borrow a connection and it's as
+ // though it can't reconnect at all. this was hard to test but it seemed to happen regularly after
+ // introduced the ConnectionFactory that enabled a way to introduce connection jitters (i.e. failures in
+ // creation of a Connection) at which point it seemed to happen with some regularity.
scheduledForCreation.decrementAndGet();
+ addConnectionIfUnderMaximum();
return null;
});
}
private boolean addConnectionIfUnderMaximum() {
+ final int openCountToActOn;
+
while (true) {
- int opened = open.get();
+ final int opened = open.get();
if (opened >= maxPoolSize)
return false;
- if (open.compareAndSet(opened, opened + 1))
+ if (open.compareAndSet(opened, opened + 1)) {
+ openCountToActOn = opened;
break;
+ }
}
if (isClosed()) {
@@ -334,10 +351,13 @@ final class ConnectionPool {
}
try {
- connections.add(new Connection(host.getHostUri(), this, settings().maxInProcessPerConnection));
- } catch (ConnectionException ce) {
- logger.error("Connections were under max, but there was an error creating the connection.", ce);
+ connections.add(connectionFactory.create(this));
+ } catch (Exception ce) {
open.decrementAndGet();
+ logger.error(String.format(
+ "Connections[%s] were under maximum allowed[%s], but there was an error creating a new connection",
+ openCountToActOn, maxPoolSize),
+ ce);
considerHostUnavailable();
return false;
}
@@ -432,20 +452,33 @@ final class ConnectionPool {
// if we timeout borrowing a connection that might mean the host is dead (or the timeout was super short).
// either way supply a function to reconnect
+ final TimeoutException timeoutException = new TimeoutException(timeoutErrorMessage);
this.considerHostUnavailable();
- throw new TimeoutException(timeoutErrorMessage);
+ throw timeoutException;
}
+ /**
+ * On a failure to get a {@link Connection} this method is called to determine if the {@link Host} should be
+ * marked as unavailable and to establish a background reconnect operation.
+ */
public void considerHostUnavailable() {
- // called when a connection is "dead" due to a non-recoverable error.
- host.makeUnavailable(this::tryReconnect);
-
- // if the host is unavailable then we should release the connections
- connections.forEach(this::definitelyDestroyConnection);
-
- // let the load-balancer know that the host is acting poorly
- this.cluster.loadBalancingStrategy().onUnavailable(host);
+ // if there is at least one available connection the host has to still be around (or is perhaps on its way out
+ // but we'll stay optimistic in this check). no connections also means "unhealthy". unsure if there is an ok
+ // "no connections" state we can get into. in any event if there are no connections then we'd just try to
+ // immediately reconnect below anyway so perhaps that state isn't really something to worry about.
+ final boolean maybeUnhealthy = connections.stream().allMatch(Connection::isDead);
+ if (maybeUnhealthy) {
+ // immediately fire off an attempt to reconnect because there are no active connections.
+ host.tryReconnectingImmediately(this::tryReconnect);
+
+ // let the load-balancer know that the host is acting poorly
+ if (!host.isAvailable()) {
+ // if the host is unavailable then we should release the connections
+ connections.forEach(this::definitelyDestroyConnection);
+ this.cluster.loadBalancingStrategy().onUnavailable(host);
+ }
+ }
}
/**
@@ -457,7 +490,13 @@ final class ConnectionPool {
Connection connection = null;
try {
- connection = borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
+ // rather than rely on borrowConnection() infrastructure and the pool create a brand new Connection
+ // instance solely for the purpose of this ping. this ensures that if the pool is overloaded that we
+ // make an honest attempt at validating host health without failing over some timeout waiting for a
+ // connection in the pool. not sure if we should try to keep this connection if it succeeds and if the
+ // pool needs it. for now that seems like an unnecessary added bit of complexity for dealing with this
+ // error state
+ connection = connectionFactory.create(this);
final RequestMessage ping = client.buildMessage(cluster.validationRequest()).create();
final CompletableFuture<ResultSet> f = new CompletableFuture<>();
connection.write(ping, f);
@@ -467,9 +506,13 @@ final class ConnectionPool {
this.cluster.loadBalancingStrategy().onAvailable(h);
return true;
} catch (Exception ex) {
- logger.debug("Failed reconnect attempt on {}", h, ex);
- if (connection != null) definitelyDestroyConnection(connection);
+ logger.error(String.format("Failed reconnect attempt on %s%s%s",
+ h, System.lineSeparator(), this.getPoolInfo()), ex);
return false;
+ } finally {
+ if (connection != null) {
+ connection.closeAsync();
+ }
}
}
@@ -547,7 +590,7 @@ final class ConnectionPool {
*/
public String getPoolInfo(final Connection connectionToCallout) {
final StringBuilder sb = new StringBuilder("ConnectionPool (");
- sb.append(host);
+ sb.append(host.toString());
sb.append(")");
if (connections.isEmpty()) {
@@ -555,25 +598,35 @@ final class ConnectionPool {
} else {
final int connectionCount = connections.size();
sb.append(System.lineSeparator());
- sb.append(String.format("Connection Pool Status (size=%s max=%s min=%s)", connectionCount, maxPoolSize, minPoolSize));
+ sb.append(String.format("Connection Pool Status (size=%s max=%s min=%s toCreate=%s markedOpen=%s bin=%s)",
+ connectionCount, maxPoolSize, minPoolSize, this.scheduledForCreation.get(), this.open.get(), bin.size()));
+ sb.append(System.lineSeparator());
+
+ appendConnections(sb, connectionToCallout, connections);
+ sb.append(System.lineSeparator());
+ sb.append("-- bin --");
sb.append(System.lineSeparator());
+ appendConnections(sb, connectionToCallout, new ArrayList<>(bin));
+ }
- for (int ix = 0; ix < connectionCount; ix++) {
- final Connection c = connections.get(ix);
+ return sb.toString().trim();
+ }
- if (c == connectionToCallout)
- sb.append("==> ");
- else
- sb.append("> ");
+ private void appendConnections(final StringBuilder sb, final Connection connectionToCallout,
+ final List<Connection> connections) {
+ final int connectionCount = connections.size();
+ for (int ix = 0; ix < connectionCount; ix++) {
+ final Connection c = connections.get(ix);
+ if (c.equals(connectionToCallout))
+ sb.append("==> ");
+ else
+ sb.append("> ");
- sb.append(c.getConnectionInfo(false));
+ sb.append(c.getConnectionInfo(false));
- if (ix < connectionCount - 1)
- sb.append(System.lineSeparator());
- }
+ if (ix < connectionCount - 1)
+ sb.append(System.lineSeparator());
}
-
- return sb.toString().trim();
}
@Override
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java
index fb0a9e333a..3d492fa612 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java
@@ -26,6 +26,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -85,6 +86,23 @@ public final class Host {
}
}
+ void tryReconnectingImmediately(final Function<Host, Boolean> reconnect) {
+ // only do a connection re-attempt if one is not already in progress
+ if (retryInProgress.compareAndSet(Boolean.FALSE, Boolean.TRUE)) {
+ retryThread = this.cluster.hostScheduler().scheduleAtFixedRate(() -> {
+ logger.debug("Trying to reconnect to host at {}", this);
+ final boolean reconnected = reconnect.apply(this);
+ if (reconnected)
+ reconnected();
+ else {
+ logger.warn("Marking {} as unavailable. Trying to reconnect.", this);
+ isAvailable = false;
+ }
+ }, 0,
+ cluster.connectionPoolSettings().reconnectInterval, TimeUnit.MILLISECONDS);
+ }
+ }
+
private void reconnected() {
// race condition! retry boolean could be set to false, a new retryThread created above
// and then cancelled here. But we're only executing this at all because we *have* reconnected
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java
index 1d2a70f7ea..9ccea2dffa 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java
@@ -21,10 +21,14 @@ package org.apache.tinkerpop.gremlin.driver.exception;
public class NoHostAvailableException extends RuntimeException {
public NoHostAvailableException() {
- super("All hosts are considered unavailable due to previous exceptions. Check the error log to find the actual reason.");
+ this("All hosts are considered unavailable due to previous exceptions. Check the error log to find the actual reason.");
}
- public NoHostAvailableException(Throwable ex) {
+ public NoHostAvailableException(final String message) {
+ super(message);
+ }
+
+ public NoHostAvailableException(final Throwable ex) {
super(ex);
}
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
index 8ebad7809e..1417409c47 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
@@ -21,6 +21,8 @@ package org.apache.tinkerpop.gremlin.driver;
import io.netty.handler.codec.CorruptedFrameException;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
+import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
import org.apache.tinkerpop.gremlin.server.AbstractGremlinServerIntegrationTest;
import org.apache.tinkerpop.gremlin.server.TestClientFactory;
@@ -28,13 +30,25 @@ import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLHandshakeException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.number.OrderingComparison.greaterThan;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrationTest {
+ private static final org.slf4j.Logger logger = LoggerFactory.getLogger(ClientConnectionIntegrateTest.class);
private Log4jRecordingAppender recordingAppender = null;
private Level previousLogLevel;
@@ -110,4 +124,98 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat
assertThat(recordingAppender.logContainsAny("^(?!.*(isDead=false)).*isDead=true.*destroyed successfully.$"), is(true));
}
+
+ /**
+ * Added for TINKERPOP-2813 - this scenario would have previously thrown tons of
+ * {@link NoHostAvailableException}.
+ */
+ @Test
+ public void shouldSucceedWithJitteryConnection() throws Exception {
+ final Cluster cluster = TestClientFactory.build().minConnectionPoolSize(1).maxConnectionPoolSize(4).
+ reconnectInterval(1000).
+ maxWaitForConnection(4000).validationRequest("g.inject()").create();
+ final Client.ClusteredClient client = cluster.connect();
+
+ client.init();
+
+ // every 10 connections let's have some problems
+ final JitteryConnectionFactory connectionFactory = new JitteryConnectionFactory(3);
+ client.hostConnectionPools.forEach((h, pool) -> pool.connectionFactory = connectionFactory);
+
+ // get an initial connection which marks the host as available
+ assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
+
+ // network is gonna get fishy - ConnectionPool should try to grow during the workload below and when it
+ // does some connections will fail to create in the background which should log some errors but not tank
+ // the submit() as connections that are currently still working and active should be able to handle the load.
+ connectionFactory.jittery = true;
+
+ // load up a hella ton of requests
+ final int requests = 1000;
+ final CountDownLatch latch = new CountDownLatch(requests);
+ final AtomicBoolean hadFailOtherThanTimeout = new AtomicBoolean(false);
+
+ new Thread(() -> {
+ IntStream.range(0, requests).forEach(i -> {
+ try {
+ client.submitAsync("1 + " + i);
+ } catch (Exception ex) {
+ // we could catch a TimeoutException here in some cases if the jitters cause a borrow of a
+ // connection to take too long. submitAsync() will wrap in a RuntimeException. can't assert
+ // this condition inside this thread or the test locks up
+ hadFailOtherThanTimeout.compareAndSet(false, !(ex.getCause() instanceof TimeoutException));
+ } finally {
+ latch.countDown();
+ }
+ });
+ }, "worker-shouldSucceedWithJitteryConnection").start();
+
+ // wait long enough for the jitters to kick in at least a little
+ while (latch.getCount() > 500) {
+ TimeUnit.MILLISECONDS.sleep(50);
+ }
+
+ // wait for requests to complete
+ assertTrue(latch.await(30000, TimeUnit.MILLISECONDS));
+
+ // make sure we had some failures for sure coming out the factory
+ assertThat(connectionFactory.getNumberOfFailures(), is(greaterThan(0L)));
+
+ // if there was a exception in the worker thread, then it had better be a TimeoutException
+ assertThat(hadFailOtherThanTimeout.get(), is(false));
+
+ cluster.close();
+ }
+
+ /**
+ * Introduces random failures when creating a {@link Connection} for the {@link ConnectionPool}.
+ */
+ public static class JitteryConnectionFactory implements ConnectionFactory {
+
+ private volatile boolean jittery = false;
+ private final AtomicLong connectionsCreated = new AtomicLong(0);
+ private final AtomicLong connectionFailures = new AtomicLong(0);
+ private final int numberOfConnectionsBetweenErrors;
+
+ public JitteryConnectionFactory(final int numberOfConnectionsBetweenErrors) {
+ this.numberOfConnectionsBetweenErrors = numberOfConnectionsBetweenErrors;
+ }
+
+ public long getNumberOfFailures() {
+ return connectionFailures.get();
+ }
+
+ @Override
+ public Connection create(final ConnectionPool pool) {
+
+ // fail creating a connection every 10 attempts or so when jittery
+ if (jittery && connectionsCreated.incrementAndGet() % numberOfConnectionsBetweenErrors == 0) {
+ connectionFailures.incrementAndGet();
+ throw new ConnectionException(pool.host.getHostUri(),
+ new SSLHandshakeException("SSL on the funk - server is big mad"));
+ }
+
+ return ConnectionFactory.super.create(pool);
+ }
+ }
}
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
index 2b66ce1de1..6a8acbad15 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.server;
import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizerIntegrateTest;
import org.apache.tinkerpop.gremlin.server.op.OpLoader;
+import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.junit.After;
import org.junit.Before;
@@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.InputStream;
+import java.util.concurrent.CompletableFuture;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assume.assumeThat;
@@ -117,6 +119,10 @@ public abstract class AbstractGremlinServerIntegrationTest {
}
public void startServer() throws Exception {
+ startServerAsync().join();
+ }
+
+ public CompletableFuture<ServerGremlinExecutor> startServerAsync() throws Exception {
final InputStream stream = getSettingsInputStream();
final Settings settings = Settings.read(stream);
overriddenSettings = overrideSettings(settings);
@@ -131,7 +137,7 @@ public abstract class AbstractGremlinServerIntegrationTest {
this.server = new GremlinServer(overriddenSettings);
- server.start().join();
+ return server.start();
}
@After
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index e44660ee50..43a53bae7a 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -55,7 +55,6 @@ import org.apache.tinkerpop.gremlin.util.function.FunctionUtils;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
@@ -63,7 +62,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.awt.Color;
-import java.io.File;
import java.net.ConnectException;
import java.time.Instant;
import java.util.ArrayList;