You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2022/11/28 18:33:58 UTC

[tinkerpop] branch TINKERPOP-2813 updated: TINKERPOP-2813 Fixed some issues with Cluster.close()

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch TINKERPOP-2813
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/TINKERPOP-2813 by this push:
     new fd6e91e02b TINKERPOP-2813 Fixed some issues with Cluster.close()
fd6e91e02b is described below

commit fd6e91e02bb70e26939bed058f1a04ecfdf63507
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Mon Nov 28 13:30:08 2022 -0500

    TINKERPOP-2813 Fixed some issues with Cluster.close()
    
    After refactoring to add the new thread pools, some situations seemed to arise where connections didn't close cleanly. Not clear if this came as a result of the thread pool changes or if they simply were always present but hidden behind use of fork/join pool and the normal gremlin pool. Still don't quite have a good async shutdown going but that wasn't quite working that way prior to these changes so perhaps that is better saved for a dedicated body of work.
---
 .../apache/tinkerpop/gremlin/driver/Cluster.java   | 38 +++++++++++++++-------
 .../tinkerpop/gremlin/driver/Connection.java       | 28 +++++++++-------
 .../tinkerpop/gremlin/driver/ConnectionPool.java   | 19 ++++++++---
 .../driver/ClientConnectionIntegrateTest.java      |  4 ++-
 4 files changed, 61 insertions(+), 28 deletions(-)

diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index e93074d3fc..7392deb7d5 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -24,6 +24,7 @@ import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.SslProvider;
 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.netty.util.concurrent.Future;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
@@ -60,6 +61,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -1021,10 +1023,14 @@ public final class Cluster {
             return b;
         }
 
-        void shutdown() {
+        /**
+         * Gracefully shutsdown the event loop and returns the termination future which signals that all jobs are done.
+         */
+        Future<?> shutdown() {
             // Do not provide a quiet period (default is 2s) to accept more requests. Once we have decided to shutdown,
             // no new requests should be accepted.
-            group.shutdownGracefully(/*quiet period*/0, /*timeout*/2, TimeUnit.SECONDS).awaitUninterruptibly();
+            group.shutdownGracefully(/*quiet period*/0, /*timeout*/2, TimeUnit.SECONDS);
+            return group.terminationFuture();
         }
     }
 
@@ -1219,25 +1225,33 @@ public final class Cluster {
             if (closeFuture.get() != null)
                 return closeFuture.get();
 
+            final List<CompletableFuture<Void>> clientCloseFutures = new ArrayList<>(openedClients.size());
             for (WeakReference<Client> openedClient : openedClients) {
                 final Client client = openedClient.get();
-                if (client != null && !client.isClosing()) {
-                    client.close();
+                if (client != null) {
+                    // best to call close() even if the Client is already closing so that we can be sure that
+                    // any background client closing operations are included in this shutdown future
+                    clientCloseFutures.add(client.closeAsync());
                 }
             }
 
-            final CompletableFuture<Void> closeIt = new CompletableFuture<>();
-            closeFuture.set(closeIt);
+            // when all the clients are fully closed then shutdown the netty event loop. not sure why this needs to
+            // block here, but if it doesn't then factory.shutdown() below doesn't seem to want to ever complete.
+            // ideally, this should all be async, but i guess it wasn't before this change so just going to leave it
+            // for now as this really isn't the focus on this change
+            CompletableFuture.allOf(clientCloseFutures.toArray(new CompletableFuture[0])).join();
 
-            hostScheduler.submit(() -> {
-                factory.shutdown();
+            final CompletableFuture<Void> closeIt = new CompletableFuture<>();
+            // shutdown the event loop. that shutdown can trigger some final jobs to get scheduled so add a listener
+            // to the termination event to shutdown remaining thread pools
+            factory.shutdown().awaitUninterruptibly().addListener(f -> {
+                executor.shutdown();
+                hostScheduler.shutdown();
+                connectionScheduler.shutdown();
                 closeIt.complete(null);
             });
 
-            // Prevent the executor from accepting new tasks while still allowing enqueued tasks to complete
-            executor.shutdown();
-            connectionScheduler.shutdown();
-            hostScheduler.shutdown();
+            closeFuture.set(closeIt);
 
             return closeIt;
         }
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 0dfe0dbab3..1183a3bdc4 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -29,6 +29,7 @@ import io.netty.channel.ChannelPromise;
 import io.netty.channel.socket.nio.NioSocketChannel;
 
 import java.net.URI;
+import java.time.Instant;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -56,6 +57,8 @@ final class Connection {
     private final Cluster cluster;
     private final Client client;
     private final ConnectionPool pool;
+    private final String creatingThread;
+    private final String createdTimestamp;
 
     public static final int MAX_IN_PROCESS = 4;
     public static final int MIN_IN_PROCESS = 1;
@@ -95,7 +98,8 @@ final class Connection {
         this.client = pool.getClient();
         this.pool = pool;
         this.maxInProcess = maxInProcess;
-
+        this.creatingThread = Thread.currentThread().getName();
+        this.createdTimestamp = Instant.now().toString();
         connectionLabel = "Connection{host=" + pool.host + "}";
 
         if (cluster.isClosing())
@@ -302,8 +306,6 @@ final class Connection {
         // guess). that seems to put the executor thread in a monitor state that it doesn't recover from. since all
         // the code in here is behind shutdownInitiated the synchronized doesn't seem necessary
         if (shutdownInitiated.compareAndSet(false, true)) {
-            final String connectionInfo = this.getConnectionInfo();
-
             // the session close message was removed in 3.5.0 after deprecation at 3.3.11. That removal was perhaps
             // a bit hasty as session semantics may still require this message in certain cases. Until we can look
             // at this in more detail, it seems best to bring back the old functionality to the driver.
@@ -342,7 +344,8 @@ final class Connection {
                 channelizer.close(channel);
 
             // seems possible that the channelizer could initialize but fail to produce a channel, so worth checking
-            // null before proceeding here
+            // null before proceeding here. also if the cluster is in shutdown then the event loop could be shutdown
+            // already and there will be no way to get a new promise out there.
             if (channel != null) {
                 final ChannelPromise promise = channel.newPromise();
                 promise.addListener(f -> {
@@ -350,7 +353,7 @@ final class Connection {
                         future.completeExceptionally(f.cause());
                     } else {
                         if (logger.isDebugEnabled())
-                            logger.debug("{} destroyed successfully.", connectionInfo);
+                            logger.debug("{} destroyed successfully.", this.getConnectionInfo());
 
                         future.complete(null);
                     }
@@ -365,9 +368,12 @@ final class Connection {
                     }
                 }
             } else {
-                logger.debug("Connection {} is shutting down but the channel was not initialized to begin with",
-                        getConnectionInfo());
+                // if we dont handle the supplied future it can hang the close
+                future.complete(null);
             }
+        } else {
+            // if we dont handle the supplied future it can hang the close
+            future.complete(null);
         }
     }
 
@@ -385,10 +391,10 @@ final class Connection {
      */
     public String getConnectionInfo(final boolean showHost) {
         return showHost ?
-                String.format("Connection{channel=%s host=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s}",
-                        getChannelId(), pool.host.toString(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing()) :
-                String.format("Connection{channel=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s}",
-                        getChannelId(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing());
+                String.format("Connection{channel=%s host=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s created=%s thread=%s}",
+                        getChannelId(), pool.host.toString(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing(), createdTimestamp, creatingThread) :
+                String.format("Connection{channel=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s created=%s thread=%s}",
+                        getChannelId(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing(), createdTimestamp, creatingThread);
     }
 
     /**
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index 4e5f019273..0ec3a4c3af 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -272,14 +272,25 @@ final class ConnectionPool {
         return bin.size();
     }
 
+    /**
+     * Calls close on connections in the pool gathering close futures from both active connections and ones in the
+     * bin.
+     */
     private CompletableFuture<Void> killAvailableConnections() {
-        final List<CompletableFuture<Void>> futures = new ArrayList<>(connections.size());
+        final List<CompletableFuture<Void>> futures = new ArrayList<>(connections.size() + bin.size());
         for (Connection connection : connections) {
             final CompletableFuture<Void> future = connection.closeAsync();
             future.thenRun(open::decrementAndGet);
             futures.add(future);
         }
 
+        // Without the ones in the bin the close for the ConnectionPool won't account for their shutdown and could
+        // lead to scenario where the bin connections stay open after the channel executor is closed which then
+        // leads to close operation getting rejected in Connection.close() for channel.newPromise().
+        for (Connection connection : bin) {
+            futures.add(connection.closeAsync());
+        }
+
         return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
     }
 
@@ -368,7 +379,7 @@ final class ConnectionPool {
 
     private boolean destroyConnection(final Connection connection) {
         while (true) {
-            int opened = open.get();
+            final int opened = open.get();
             if (opened <= minPoolSize)
                 return false;
 
@@ -598,8 +609,8 @@ final class ConnectionPool {
         } else {
             final int connectionCount = connections.size();
             sb.append(System.lineSeparator());
-            sb.append(String.format("Connection Pool Status (size=%s max=%s min=%s toCreate=%s markedOpen=%s bin=%s)",
-                    connectionCount, maxPoolSize, minPoolSize, this.scheduledForCreation.get(), this.open.get(), bin.size()));
+            sb.append(String.format("Connection Pool Status (size=%s max=%s min=%s toCreate=%s bin=%s)",
+                    connectionCount, maxPoolSize, minPoolSize, this.scheduledForCreation.get(), bin.size()));
             sb.append(System.lineSeparator());
 
             appendConnections(sb, connectionToCallout, connections);
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
index 1417409c47..8899db2e9e 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
@@ -184,6 +184,8 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat
         // if there was a exception in the worker thread, then it had better be a TimeoutException
         assertThat(hadFailOtherThanTimeout.get(), is(false));
 
+        connectionFactory.jittery = false;
+
         cluster.close();
     }
 
@@ -212,7 +214,7 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat
             if (jittery && connectionsCreated.incrementAndGet() % numberOfConnectionsBetweenErrors == 0) {
                 connectionFailures.incrementAndGet();
                 throw new ConnectionException(pool.host.getHostUri(),
-                        new SSLHandshakeException("SSL on the funk - server is big mad"));
+                        new SSLHandshakeException("SSL on the funk - server is big mad with the jitters"));
             }
 
             return ConnectionFactory.super.create(pool);