You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2022/11/28 18:33:58 UTC
[tinkerpop] branch TINKERPOP-2813 updated: TINKERPOP-2813 Fixed some issues with Cluster.close()
This is an automated email from the ASF dual-hosted git repository.
spmallette pushed a commit to branch TINKERPOP-2813
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/TINKERPOP-2813 by this push:
new fd6e91e02b TINKERPOP-2813 Fixed some issues with Cluster.close()
fd6e91e02b is described below
commit fd6e91e02bb70e26939bed058f1a04ecfdf63507
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Mon Nov 28 13:30:08 2022 -0500
TINKERPOP-2813 Fixed some issues with Cluster.close()
After refactoring to add the new thread pools, some situations seemed to arise where connections didn't close cleanly. Not clear if this came as a result of the thread pool changes or if they simply were always present but hidden behind use of fork/join pool and the normal gremlin pool. Still don't quite have a good async shutdown going but that wasn't quite working that way prior to these changes so perhaps that is better saved for a dedicated body of work.
---
.../apache/tinkerpop/gremlin/driver/Cluster.java | 38 +++++++++++++++-------
.../tinkerpop/gremlin/driver/Connection.java | 28 +++++++++-------
.../tinkerpop/gremlin/driver/ConnectionPool.java | 19 ++++++++---
.../driver/ClientConnectionIntegrateTest.java | 4 ++-
4 files changed, 61 insertions(+), 28 deletions(-)
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index e93074d3fc..7392deb7d5 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -24,6 +24,7 @@ import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.netty.util.concurrent.Future;
import org.apache.commons.configuration2.Configuration;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
@@ -60,6 +61,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -1021,10 +1023,14 @@ public final class Cluster {
return b;
}
- void shutdown() {
+ /**
+ * Gracefully shutsdown the event loop and returns the termination future which signals that all jobs are done.
+ */
+ Future<?> shutdown() {
// Do not provide a quiet period (default is 2s) to accept more requests. Once we have decided to shutdown,
// no new requests should be accepted.
- group.shutdownGracefully(/*quiet period*/0, /*timeout*/2, TimeUnit.SECONDS).awaitUninterruptibly();
+ group.shutdownGracefully(/*quiet period*/0, /*timeout*/2, TimeUnit.SECONDS);
+ return group.terminationFuture();
}
}
@@ -1219,25 +1225,33 @@ public final class Cluster {
if (closeFuture.get() != null)
return closeFuture.get();
+ final List<CompletableFuture<Void>> clientCloseFutures = new ArrayList<>(openedClients.size());
for (WeakReference<Client> openedClient : openedClients) {
final Client client = openedClient.get();
- if (client != null && !client.isClosing()) {
- client.close();
+ if (client != null) {
+ // best to call close() even if the Client is already closing so that we can be sure that
+ // any background client closing operations are included in this shutdown future
+ clientCloseFutures.add(client.closeAsync());
}
}
- final CompletableFuture<Void> closeIt = new CompletableFuture<>();
- closeFuture.set(closeIt);
+ // when all the clients are fully closed then shutdown the netty event loop. not sure why this needs to
+ // block here, but if it doesn't then factory.shutdown() below doesn't seem to want to ever complete.
+ // ideally, this should all be async, but i guess it wasn't before this change so just going to leave it
+ // for now as this really isn't the focus on this change
+ CompletableFuture.allOf(clientCloseFutures.toArray(new CompletableFuture[0])).join();
- hostScheduler.submit(() -> {
- factory.shutdown();
+ final CompletableFuture<Void> closeIt = new CompletableFuture<>();
+ // shutdown the event loop. that shutdown can trigger some final jobs to get scheduled so add a listener
+ // to the termination event to shutdown remaining thread pools
+ factory.shutdown().awaitUninterruptibly().addListener(f -> {
+ executor.shutdown();
+ hostScheduler.shutdown();
+ connectionScheduler.shutdown();
closeIt.complete(null);
});
- // Prevent the executor from accepting new tasks while still allowing enqueued tasks to complete
- executor.shutdown();
- connectionScheduler.shutdown();
- hostScheduler.shutdown();
+ closeFuture.set(closeIt);
return closeIt;
}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 0dfe0dbab3..1183a3bdc4 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -29,6 +29,7 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.URI;
+import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -56,6 +57,8 @@ final class Connection {
private final Cluster cluster;
private final Client client;
private final ConnectionPool pool;
+ private final String creatingThread;
+ private final String createdTimestamp;
public static final int MAX_IN_PROCESS = 4;
public static final int MIN_IN_PROCESS = 1;
@@ -95,7 +98,8 @@ final class Connection {
this.client = pool.getClient();
this.pool = pool;
this.maxInProcess = maxInProcess;
-
+ this.creatingThread = Thread.currentThread().getName();
+ this.createdTimestamp = Instant.now().toString();
connectionLabel = "Connection{host=" + pool.host + "}";
if (cluster.isClosing())
@@ -302,8 +306,6 @@ final class Connection {
// guess). that seems to put the executor thread in a monitor state that it doesn't recover from. since all
// the code in here is behind shutdownInitiated the synchronized doesn't seem necessary
if (shutdownInitiated.compareAndSet(false, true)) {
- final String connectionInfo = this.getConnectionInfo();
-
// the session close message was removed in 3.5.0 after deprecation at 3.3.11. That removal was perhaps
// a bit hasty as session semantics may still require this message in certain cases. Until we can look
// at this in more detail, it seems best to bring back the old functionality to the driver.
@@ -342,7 +344,8 @@ final class Connection {
channelizer.close(channel);
// seems possible that the channelizer could initialize but fail to produce a channel, so worth checking
- // null before proceeding here
+ // null before proceeding here. also if the cluster is in shutdown then the event loop could be shutdown
+ // already and there will be no way to get a new promise out there.
if (channel != null) {
final ChannelPromise promise = channel.newPromise();
promise.addListener(f -> {
@@ -350,7 +353,7 @@ final class Connection {
future.completeExceptionally(f.cause());
} else {
if (logger.isDebugEnabled())
- logger.debug("{} destroyed successfully.", connectionInfo);
+ logger.debug("{} destroyed successfully.", this.getConnectionInfo());
future.complete(null);
}
@@ -365,9 +368,12 @@ final class Connection {
}
}
} else {
- logger.debug("Connection {} is shutting down but the channel was not initialized to begin with",
- getConnectionInfo());
+ // if we dont handle the supplied future it can hang the close
+ future.complete(null);
}
+ } else {
+ // if we dont handle the supplied future it can hang the close
+ future.complete(null);
}
}
@@ -385,10 +391,10 @@ final class Connection {
*/
public String getConnectionInfo(final boolean showHost) {
return showHost ?
- String.format("Connection{channel=%s host=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s}",
- getChannelId(), pool.host.toString(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing()) :
- String.format("Connection{channel=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s}",
- getChannelId(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing());
+ String.format("Connection{channel=%s host=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s created=%s thread=%s}",
+ getChannelId(), pool.host.toString(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing(), createdTimestamp, creatingThread) :
+ String.format("Connection{channel=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s created=%s thread=%s}",
+ getChannelId(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing(), createdTimestamp, creatingThread);
}
/**
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index 4e5f019273..0ec3a4c3af 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -272,14 +272,25 @@ final class ConnectionPool {
return bin.size();
}
+ /**
+ * Calls close on connections in the pool gathering close futures from both active connections and ones in the
+ * bin.
+ */
private CompletableFuture<Void> killAvailableConnections() {
- final List<CompletableFuture<Void>> futures = new ArrayList<>(connections.size());
+ final List<CompletableFuture<Void>> futures = new ArrayList<>(connections.size() + bin.size());
for (Connection connection : connections) {
final CompletableFuture<Void> future = connection.closeAsync();
future.thenRun(open::decrementAndGet);
futures.add(future);
}
+ // Without the ones in the bin the close for the ConnectionPool won't account for their shutdown and could
+ // lead to scenario where the bin connections stay open after the channel executor is closed which then
+ // leads to close operation getting rejected in Connection.close() for channel.newPromise().
+ for (Connection connection : bin) {
+ futures.add(connection.closeAsync());
+ }
+
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
@@ -368,7 +379,7 @@ final class ConnectionPool {
private boolean destroyConnection(final Connection connection) {
while (true) {
- int opened = open.get();
+ final int opened = open.get();
if (opened <= minPoolSize)
return false;
@@ -598,8 +609,8 @@ final class ConnectionPool {
} else {
final int connectionCount = connections.size();
sb.append(System.lineSeparator());
- sb.append(String.format("Connection Pool Status (size=%s max=%s min=%s toCreate=%s markedOpen=%s bin=%s)",
- connectionCount, maxPoolSize, minPoolSize, this.scheduledForCreation.get(), this.open.get(), bin.size()));
+ sb.append(String.format("Connection Pool Status (size=%s max=%s min=%s toCreate=%s bin=%s)",
+ connectionCount, maxPoolSize, minPoolSize, this.scheduledForCreation.get(), bin.size()));
sb.append(System.lineSeparator());
appendConnections(sb, connectionToCallout, connections);
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
index 1417409c47..8899db2e9e 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
@@ -184,6 +184,8 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat
// if there was a exception in the worker thread, then it had better be a TimeoutException
assertThat(hadFailOtherThanTimeout.get(), is(false));
+ connectionFactory.jittery = false;
+
cluster.close();
}
@@ -212,7 +214,7 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat
if (jittery && connectionsCreated.incrementAndGet() % numberOfConnectionsBetweenErrors == 0) {
connectionFailures.incrementAndGet();
throw new ConnectionException(pool.host.getHostUri(),
- new SSLHandshakeException("SSL on the funk - server is big mad"));
+ new SSLHandshakeException("SSL on the funk - server is big mad with the jitters"));
}
return ConnectionFactory.super.create(pool);