You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2022/10/17 13:09:46 UTC

[tinkerpop] 02/02: TINKERPOP-2813 Refactored job scheduling and thread pools

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch TINKERPOP-2813
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 741fa512e06aa36bd72aa6056782173c0200d6ab
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Mon Oct 17 08:47:01 2022 -0400

    TINKERPOP-2813 Refactored job scheduling and thread pools
    
    Went back to parallelized ConnectionPool creation and factored out that single threaded executor initialization which was put in long ago to prevent deadlock situations from using the default thread pool alone. Stopped using the fork-join pool for all jobs. Added a new thread pool distinguishing between Connection jobs and Host jobs to prevent deadlock problems. The single thread approach to the old scheduler thread pool seemed wrong because it seems possible that reconnect retries sc [...]
---
 CHANGELOG.asciidoc                                 |  3 ++
 .../apache/tinkerpop/gremlin/driver/Client.java    | 36 ++++++--------------
 .../apache/tinkerpop/gremlin/driver/Cluster.java   | 39 +++++++++++++++++-----
 .../tinkerpop/gremlin/driver/Connection.java       | 10 +++---
 .../tinkerpop/gremlin/driver/ConnectionPool.java   |  4 +--
 .../org/apache/tinkerpop/gremlin/driver/Host.java  |  2 +-
 .../apache/tinkerpop/gremlin/driver/Settings.java  |  1 -
 .../tinkerpop/gremlin/driver/ClientTest.java       |  2 +-
 .../WebSocketClientBehaviorIntegrateTest.java      |  2 +-
 9 files changed, 54 insertions(+), 45 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index d1ffe27e80..aa4756d3fa 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -36,6 +36,9 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 * Async operations in .NET can now be cancelled. This however does not cancel work that is already happening on the server.
 * Bumped to `snakeyaml` 1.32 to fix security vulnerability.
 * Improved logging for `gremlin-driver`.
+* Modified `Connection` and `Host` job scheduling in `gremlin-driver` by dividing their work to two different thread pools and sparing work from the primary pool responsible for submitting requests and reading results.
+* Prevented usage of the fork-join pool for `gremlin-driver` job scheduling.
+* Changed `Host` initialization within a `Client` to be parallel again in `gremlin-driver`.
 
 ==== Bugs
 
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 89dfcc780e..bca2932d7a 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -18,7 +18,6 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,8 +45,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -518,26 +515,15 @@ public abstract class Client {
          */
         @Override
         protected void initializeImplementation() {
-            // use a special executor here to initialize the Host instances as the worker thread pool may be
-            // insufficiently sized for this task and the parallel initialization of the ConnectionPool. if too small
-            // tasks may be schedule in such a way as to produce a deadlock: TINKERPOP-2550
-            //
-            // the cost of this single threaded executor here should be fairly small because it is only used once at
-            // initialization and shutdown. since users will typically construct a Client once for the life of their
-            // application there shouldn't be tons of thread pools being created and destroyed.
-            final BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("gremlin-driver-initializer").build();
-            final ExecutorService hostExecutor = Executors.newSingleThreadExecutor(threadFactory);
-
             try {
                 CompletableFuture.allOf(cluster.allHosts().stream()
-                                .map(host -> CompletableFuture.runAsync(() -> initializeConnectionSetupForHost.accept(host), hostExecutor))
+                                .map(host -> CompletableFuture.runAsync(
+                                        () -> initializeConnectionSetupForHost.accept(host), cluster.hostScheduler()))
                                 .toArray(CompletableFuture[]::new))
                         .join();
             } catch (CompletionException ex) {
                 logger.error("Initialization failed", ex);
                 this.initializationFailure = ex;
-            } finally {
-                hostExecutor.shutdown();
             }
 
             // throw an error if there is no host available after initializing connection pool.
@@ -548,7 +534,7 @@ public abstract class Client {
             final List<Host> unavailableHosts = cluster.allHosts()
                     .stream().filter(host -> !host.isAvailable()).collect(Collectors.toList());
             if (!unavailableHosts.isEmpty()) {
-                CompletableFuture.runAsync(() -> handleUnavailableHosts(unavailableHosts));
+                handleUnavailableHosts(unavailableHosts);
             }
         }
 
@@ -596,15 +582,15 @@ public abstract class Client {
             }
         };
 
-        private void handleUnavailableHosts(List<Host> unavailableHosts) {
+        private void handleUnavailableHosts(final List<Host> unavailableHosts) {
             // start the re-initialization attempt for each of the unavailable hosts through Host.makeUnavailable().
-            try {
-                CompletableFuture.allOf(unavailableHosts.stream()
-                                .map(host -> CompletableFuture.runAsync(() -> host.makeUnavailable(this::tryReInitializeHost)))
-                                .toArray(CompletableFuture[]::new))
-                        .join();
-            } catch (CompletionException ex) {
-                logger.error("", (ex.getCause() == null) ? ex : ex.getCause());
+            for (Host host : unavailableHosts) {
+                final CompletableFuture<Void> f = CompletableFuture.runAsync(
+                        () -> host.makeUnavailable(this::tryReInitializeHost), cluster.hostScheduler());
+                f.exceptionally(t -> {
+                    logger.error("", (t.getCause() == null) ? t : t.getCause());
+                    return null;
+                });
             }
         }
 
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index adc20f7063..e93074d3fc 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -34,7 +34,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
 
 import java.io.File;
@@ -476,8 +475,12 @@ public final class Cluster {
         return manager.executor;
     }
 
-    ScheduledExecutorService scheduler() {
-        return manager.scheduler;
+    ScheduledExecutorService hostScheduler() {
+        return manager.hostScheduler;
+    }
+
+    ScheduledExecutorService connectionScheduler() {
+        return manager.connectionScheduler;
     }
 
     Settings.ConnectionPoolSettings connectionPoolSettings() {
@@ -1038,8 +1041,20 @@ public final class Cluster {
         private final Supplier<RequestMessage.Builder> validationRequest;
         private final HandshakeInterceptor interceptor;
 
+        /**
+         * Thread pool for requests.
+         */
         private final ScheduledThreadPoolExecutor executor;
-        private final ScheduledThreadPoolExecutor scheduler;
+
+        /**
+         * Thread pool for background work related to the {@link Host}.
+         */
+        private final ScheduledThreadPoolExecutor hostScheduler;
+
+        /**
+         * Thread pool for background work related to the {@link Connection} and {@link ConnectionPool}.
+         */
+        private final ScheduledThreadPoolExecutor connectionScheduler;
 
         private final int nioPoolSize;
         private final int workerPoolSize;
@@ -1102,9 +1117,14 @@ public final class Cluster {
             // the executor above should be reserved for reading/writing background tasks that wont interfere with each
             // other if the thread pool is 1 otherwise tasks may be schedule in such a way as to produce a deadlock
             // as in TINKERPOP-2550. not sure if there is a way to only require the worker pool for all of this. as it
-            // sits now the worker pool probably doesn't need to be a scheduled executor type
-            this.scheduler = new ScheduledThreadPoolExecutor(1,
-                    new BasicThreadFactory.Builder().namingPattern("gremlin-driver-scheduler").build());
+            // sits now the worker pool probably doesn't need to be a scheduled executor type.
+            this.hostScheduler = new ScheduledThreadPoolExecutor(contactPoints.size() + 1,
+                    new BasicThreadFactory.Builder().namingPattern("gremlin-driver-host-scheduler-%d").build());
+
+            // we distinguish between the hostScheduler and the connectionScheduler because you can end in deadlock
+            // if all the possible jobs the driver allows for go to a single thread pool.
+            this.connectionScheduler = new ScheduledThreadPoolExecutor(contactPoints.size() + 1,
+                    new BasicThreadFactory.Builder().namingPattern("gremlin-driver-conn-scheduler-%d").build());
 
             validationRequest = () -> RequestMessage.build(Tokens.OPS_EVAL).add(Tokens.ARGS_GREMLIN, builder.validationRequest);
         }
@@ -1209,14 +1229,15 @@ public final class Cluster {
             final CompletableFuture<Void> closeIt = new CompletableFuture<>();
             closeFuture.set(closeIt);
 
-            scheduler.submit(() -> {
+            hostScheduler.submit(() -> {
                 factory.shutdown();
                 closeIt.complete(null);
             });
 
             // Prevent the executor from accepting new tasks while still allowing enqueued tasks to complete
             executor.shutdown();
-            scheduler.shutdown();
+            connectionScheduler.shutdown();
+            hostScheduler.shutdown();
 
             return closeIt;
         }
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index e2f5717714..81dd440891 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -131,14 +131,14 @@ final class Connection {
                             "Server closed the Connection on channel %s - scheduling removal from %s",
                             channel.id().asShortText(), thisConnection.pool.getPoolInfo(thisConnection)));
 
-                    // delegate the task to worker thread and free up the event loop
-                    thisConnection.cluster.executor().submit(() -> thisConnection.pool.definitelyDestroyConnection(thisConnection));
+                    // delegate the task to scheduler thread and free up the event loop
+                    thisConnection.cluster.connectionScheduler().submit(() -> thisConnection.pool.definitelyDestroyConnection(thisConnection));
                 }
             });
 
             logger.info("Created new connection for {}", uri);
         } catch (Exception ex) {
-            throw new ConnectionException(uri, "Could not open " + this.toString(), ex);
+            throw new ConnectionException(uri, "Could not open " + this.getConnectionInfo(true), ex);
         }
     }
 
@@ -201,7 +201,7 @@ final class Connection {
                 shutdown(future);
         } else {
             // there may be some pending requests. schedule a job to wait for those to complete and then shutdown
-            new CheckForPending(future).runUntilDone(cluster.executor());
+            new CheckForPending(future).runUntilDone(cluster.connectionScheduler());
         }
 
         return future;
@@ -376,7 +376,7 @@ final class Connection {
     public String getConnectionInfo(final boolean showHost) {
         return showHost ?
                 String.format("Connection{channel=%s, host=%s, isDead=%s, borrowed=%s, pending=%s}",
-                channel.id().asShortText(), pool.host, isDead(), borrowed, pending.size()) :
+                getChannelId(), pool.host, isDead(), borrowed, pending.size()) :
                 String.format("Connection{channel=%s, isDead=%s, borrowed=%s, pending=%s}",
                         channel.id().asShortText(), isDead(), borrowed, pending.size());
     }
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index f1d8f69eb7..b1a9632738 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -105,7 +105,7 @@ final class ConnectionPool {
                     } catch (ConnectionException e) {
                         throw new CompletionException(e);
                     }
-                }, cluster.executor()));
+                }, cluster.connectionScheduler()));
             }
 
             CompletableFuture.allOf(connectionCreationFutures.toArray(new CompletableFuture[0])).join();
@@ -311,7 +311,7 @@ final class ConnectionPool {
     }
 
     private void newConnection() {
-        cluster.executor().submit(() -> {
+        cluster.connectionScheduler().submit(() -> {
             addConnectionIfUnderMaximum();
             scheduledForCreation.decrementAndGet();
             return null;
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java
index afac54ce5e..fb0a9e333a 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java
@@ -77,7 +77,7 @@ public final class Host {
 
         // only do a connection re-attempt if one is not already in progress
         if (retryInProgress.compareAndSet(Boolean.FALSE, Boolean.TRUE)) {
-            retryThread = this.cluster.scheduler().scheduleAtFixedRate(() -> {
+            retryThread = this.cluster.hostScheduler().scheduleAtFixedRate(() -> {
                     logger.debug("Trying to reconnect to dead host at {}", this);
                     if (reconnect.apply(this)) reconnected();
                 }, cluster.connectionPoolSettings().reconnectInterval,
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
index 0e8ae0fdf3..38608769ca 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
@@ -395,7 +395,6 @@ final class Settings {
         public String validationRequest = "''";
 
         /**
-         *
          * Duration of time in milliseconds provided for connection setup to complete which includes WebSocket
          * handshake and SSL handshake. Beyond this duration an exception would be thrown if the handshake is not
          * complete by then.
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
index 1ca2aeb845..1ab61b3cfc 100644
--- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
@@ -58,7 +58,7 @@ public class ClientTest {
         when(mockAvailableHost.isAvailable()).thenReturn(true);
         when(cluster.allHosts()).thenReturn(Collections.singletonList(mockAvailableHost));
         when(cluster.executor()).thenReturn(executor);
-        when(cluster.scheduler()).thenReturn(scheduler);
+        when(cluster.hostScheduler()).thenReturn(scheduler);
     }
 
     @After
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/WebSocketClientBehaviorIntegrateTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/WebSocketClientBehaviorIntegrateTest.java
index 0eb34f6867..549aa75855 100644
--- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/WebSocketClientBehaviorIntegrateTest.java
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/WebSocketClientBehaviorIntegrateTest.java
@@ -96,7 +96,7 @@ public class WebSocketClientBehaviorIntegrateTest {
      */
     @Test
     public void shouldNotDeadlockOnInitialization() throws Exception {
-        // it seems you cah add the same host more than once so while kinda weird it is helpful in faithfully
+        // it seems you can add the same host more than once so while kinda weird it is helpful in faithfully
         // recreating the deadlock situation, though it can/will happen with just one host. workerPoolSize at
         // "1" also helps faithfully reproduce the problem though it can happen at larger pool sizes depending
         // on the timing/interleaving of tasks. the larger connection pool sizes may not be required given the