You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by he...@apache.org on 2024/03/05 23:48:30 UTC

(pulsar) branch master updated: [improve][client] add physicalAddress as part of connection pool key (#22196)

This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e2f94dc98db [improve][client] add physicalAddress as part of connection pool key (#22196)
e2f94dc98db is described below

commit e2f94dc98dbecb4dc401ba837c54f497ca9d896f
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Tue Mar 5 15:48:24 2024 -0800

    [improve][client] add physicalAddress as part of connection pool key (#22196)
---
 .../impl/AutoCloseUselessClientConSupports.java    | 15 +----
 .../apache/pulsar/client/impl/ConnectionPool.java  | 77 ++++++++++------------
 2 files changed, 34 insertions(+), 58 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java
index e03b1709137..c9f478969a6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java
@@ -18,19 +18,14 @@
  */
 package org.apache.pulsar.client.impl;
 
-import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.HashSet;
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 import org.apache.pulsar.broker.MultiBrokerBaseTest;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -71,16 +66,8 @@ public class AutoCloseUselessClientConSupports extends MultiBrokerBaseTest {
     protected void trigReleaseConnection(PulsarClientImpl pulsarClient)
             throws InterruptedException, NoSuchFieldException, IllegalAccessException {
         // Wait for every request has been response.
-        Field field = ConnectionPool.class.getDeclaredField("pool");
-        field.setAccessible(true);
-        ConcurrentHashMap<InetSocketAddress,ConcurrentMap<Integer,
-                CompletableFuture<ClientCnx>>> pool =
-                (ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer,
-                        CompletableFuture<ClientCnx>>>) field.get(pulsarClient.getCnxPool());
-        final List<CompletableFuture<ClientCnx>> clientCnxWrapList =
-                pool.values().stream().flatMap(c -> c.values().stream()).collect(Collectors.toList());
         Awaitility.waitAtMost(Duration.ofSeconds(5)).until(() -> {
-            for (CompletableFuture<ClientCnx> clientCnxWrapFuture : clientCnxWrapList){
+            for (CompletableFuture<ClientCnx> clientCnxWrapFuture : pulsarClient.getCnxPool().getConnections()){
                 if (!clientCnxWrapFuture.isDone()){
                     continue;
                 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 9750911b37c..850e805067d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import lombok.Value;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
@@ -64,7 +65,7 @@ public class ConnectionPool implements AutoCloseable {
 
     public static final int IDLE_DETECTION_INTERVAL_SECONDS_MIN = 60;
 
-    protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
+    protected final ConcurrentMap<Key, CompletableFuture<ClientCnx>> pool;
 
     private final Bootstrap bootstrap;
     private final PulsarChannelInitializer channelInitializerHandler;
@@ -87,6 +88,14 @@ public class ConnectionPool implements AutoCloseable {
     /** Async release useless connections task. **/
     private ScheduledFuture asyncReleaseUselessConnectionsTask;
 
+
+    @Value
+    private static class Key {
+        InetSocketAddress logicalAddress;
+        InetSocketAddress physicalAddress;
+        int randomKey;
+    }
+
     public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
         this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup));
     }
@@ -185,7 +194,7 @@ public class ConnectionPool implements AutoCloseable {
     }
 
     void closeAllConnections() {
-        pool.values().forEach(map -> map.values().forEach(future -> {
+        pool.values().forEach(future -> {
             if (future.isDone()) {
                 if (!future.isCompletedExceptionally()) {
                     // Connection was already created successfully, the join will not throw any exception
@@ -198,10 +207,9 @@ public class ConnectionPool implements AutoCloseable {
                 // succeed
                 future.thenAccept(ClientCnx::close);
             }
-        }));
+        });
     }
-
-    /**
+            /**
      * Get a connection from the pool.
      * <p>
      * The connection can either be created or be coming from the pool itself.
@@ -222,51 +230,44 @@ public class ConnectionPool implements AutoCloseable {
             InetSocketAddress physicalAddress, final int randomKey) {
         if (maxConnectionsPerHosts == 0) {
             // Disable pooling
-            return createConnection(logicalAddress, physicalAddress, -1);
+            return createConnection(new Key(logicalAddress, physicalAddress, -1));
         }
-
-        final ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool =
-                pool.computeIfAbsent(logicalAddress, a -> new ConcurrentHashMap<>());
-        CompletableFuture<ClientCnx> completableFuture = innerPool
-                .computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey));
+        Key key = new Key(logicalAddress, physicalAddress, randomKey);
+        CompletableFuture<ClientCnx> completableFuture = pool.computeIfAbsent(key, k -> createConnection(key));
         if (completableFuture.isCompletedExceptionally()) {
             // we cannot cache a failed connection, so we remove it from the pool
             // there is a race condition in which
             // cleanupConnection is called before caching this result
             // and so the clean up fails
-            cleanupConnection(logicalAddress, randomKey, completableFuture);
+            pool.remove(key, completableFuture);
             return completableFuture;
         }
 
         return completableFuture.thenCompose(clientCnx -> {
             // If connection already release, create a new one.
             if (clientCnx.getIdleState().isReleased()) {
-                cleanupConnection(logicalAddress, randomKey, completableFuture);
-                return innerPool
-                        .computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey));
+                pool.remove(key, completableFuture);
+                return pool.computeIfAbsent(key, k -> createConnection(key));
             }
             // Try use exists connection.
             if (clientCnx.getIdleState().tryMarkUsingAndClearIdleTime()) {
                 return CompletableFuture.completedFuture(clientCnx);
             } else {
                 // If connection already release, create a new one.
-                cleanupConnection(logicalAddress, randomKey, completableFuture);
-                return innerPool
-                        .computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey));
+                pool.remove(key, completableFuture);
+                return pool.computeIfAbsent(key, k -> createConnection(key));
             }
         });
     }
 
-    private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalAddress,
-            InetSocketAddress physicalAddress, int connectionKey) {
+    private CompletableFuture<ClientCnx> createConnection(Key key) {
         if (log.isDebugEnabled()) {
-            log.debug("Connection for {} not found in cache", logicalAddress);
+            log.debug("Connection for {} not found in cache", key.logicalAddress);
         }
 
         final CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<>();
-
         // Trigger async connect to broker
-        createConnection(logicalAddress, physicalAddress).thenAccept(channel -> {
+        createConnection(key.logicalAddress, key.physicalAddress).thenAccept(channel -> {
             log.info("[{}] Connected to server", channel);
 
             channel.closeFuture().addListener(v -> {
@@ -274,7 +275,7 @@ public class ConnectionPool implements AutoCloseable {
                 if (log.isDebugEnabled()) {
                     log.debug("Removing closed connection from pool: {}", v);
                 }
-                cleanupConnection(logicalAddress, connectionKey, cnxFuture);
+                pool.remove(key, cnxFuture);
             });
 
             // We are connected to broker, but need to wait until the connect/connected handshake is
@@ -300,14 +301,14 @@ public class ConnectionPool implements AutoCloseable {
                 // CompletableFuture is cached into the "pool" map,
                 // it is not enough to clean it here, we need to clean it
                 // in the "pool" map when the CompletableFuture is cached
-                cleanupConnection(logicalAddress, connectionKey, cnxFuture);
+                pool.remove(key, cnxFuture);
                 cnx.ctx().close();
                 return null;
             });
         }).exceptionally(exception -> {
             eventLoopGroup.execute(() -> {
-                log.warn("Failed to open connection to {} : {}", physicalAddress, exception.getMessage());
-                cleanupConnection(logicalAddress, connectionKey, cnxFuture);
+                log.warn("Failed to open connection to {} : {}", key.physicalAddress, exception.getMessage());
+                pool.remove(key, cnxFuture);
                 cnxFuture.completeExceptionally(new PulsarClientException(exception));
             });
             return null;
@@ -439,17 +440,9 @@ public class ConnectionPool implements AutoCloseable {
         }
     }
 
-    private void cleanupConnection(InetSocketAddress address, int connectionKey,
-            CompletableFuture<ClientCnx> connectionFuture) {
-        ConcurrentMap<Integer, CompletableFuture<ClientCnx>> map = pool.get(address);
-        if (map != null) {
-            map.remove(connectionKey, connectionFuture);
-        }
-    }
-
     @VisibleForTesting
     int getPoolSize() {
-        return pool.values().stream().mapToInt(Map::size).sum();
+        return pool.size();
     }
 
     private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class);
@@ -459,11 +452,8 @@ public class ConnectionPool implements AutoCloseable {
             return;
         }
         List<Runnable> releaseIdleConnectionTaskList = new ArrayList<>();
-        for (Map.Entry<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> entry :
-                pool.entrySet()){
-            ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool = entry.getValue();
-            for (Map.Entry<Integer, CompletableFuture<ClientCnx>> entry0 : innerPool.entrySet()) {
-                CompletableFuture<ClientCnx> future = entry0.getValue();
+        for (Map.Entry<Key,  CompletableFuture<ClientCnx>> entry : pool.entrySet()) {
+                CompletableFuture<ClientCnx> future = entry.getValue();
                 // Ensure connection has been connected.
                 if (!future.isDone()) {
                     continue;
@@ -481,18 +471,17 @@ public class ConnectionPool implements AutoCloseable {
                 if (clientCnx.getIdleState().isReleasing()) {
                     releaseIdleConnectionTaskList.add(() -> {
                         if (clientCnx.getIdleState().tryMarkReleasedAndCloseConnection()) {
-                            cleanupConnection(entry.getKey(), entry0.getKey(), future);
+                            pool.remove(entry.getKey(), future);
                         }
                     });
                 }
             }
-        }
         // Do release idle connections.
         releaseIdleConnectionTaskList.forEach(Runnable::run);
     }
 
     public Set<CompletableFuture<ClientCnx>> getConnections() {
         return Collections.unmodifiableSet(
-                pool.values().stream().flatMap(n -> n.values().stream()).collect(Collectors.toSet()));
+                pool.values().stream().collect(Collectors.toSet()));
     }
 }