You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by he...@apache.org on 2024/03/05 23:48:30 UTC
(pulsar) branch master updated: [improve][client] add physicalAddress as part of connection pool key (#22196)
This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e2f94dc98db [improve][client] add physicalAddress as part of connection pool key (#22196)
e2f94dc98db is described below
commit e2f94dc98dbecb4dc401ba837c54f497ca9d896f
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Tue Mar 5 15:48:24 2024 -0800
[improve][client] add physicalAddress as part of connection pool key (#22196)
---
.../impl/AutoCloseUselessClientConSupports.java | 15 +----
.../apache/pulsar/client/impl/ConnectionPool.java | 77 ++++++++++------------
2 files changed, 34 insertions(+), 58 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java
index e03b1709137..c9f478969a6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java
@@ -18,19 +18,14 @@
*/
package org.apache.pulsar.client.impl;
-import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashSet;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -71,16 +66,8 @@ public class AutoCloseUselessClientConSupports extends MultiBrokerBaseTest {
protected void trigReleaseConnection(PulsarClientImpl pulsarClient)
throws InterruptedException, NoSuchFieldException, IllegalAccessException {
// Wait for every request has been response.
- Field field = ConnectionPool.class.getDeclaredField("pool");
- field.setAccessible(true);
- ConcurrentHashMap<InetSocketAddress,ConcurrentMap<Integer,
- CompletableFuture<ClientCnx>>> pool =
- (ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer,
- CompletableFuture<ClientCnx>>>) field.get(pulsarClient.getCnxPool());
- final List<CompletableFuture<ClientCnx>> clientCnxWrapList =
- pool.values().stream().flatMap(c -> c.values().stream()).collect(Collectors.toList());
Awaitility.waitAtMost(Duration.ofSeconds(5)).until(() -> {
- for (CompletableFuture<ClientCnx> clientCnxWrapFuture : clientCnxWrapList){
+ for (CompletableFuture<ClientCnx> clientCnxWrapFuture : pulsarClient.getCnxPool().getConnections()){
if (!clientCnxWrapFuture.isDone()){
continue;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 9750911b37c..850e805067d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import lombok.Value;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
@@ -64,7 +65,7 @@ public class ConnectionPool implements AutoCloseable {
public static final int IDLE_DETECTION_INTERVAL_SECONDS_MIN = 60;
- protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
+ protected final ConcurrentMap<Key, CompletableFuture<ClientCnx>> pool;
private final Bootstrap bootstrap;
private final PulsarChannelInitializer channelInitializerHandler;
@@ -87,6 +88,14 @@ public class ConnectionPool implements AutoCloseable {
/** Async release useless connections task. **/
private ScheduledFuture asyncReleaseUselessConnectionsTask;
+
+ @Value
+ private static class Key {
+ InetSocketAddress logicalAddress;
+ InetSocketAddress physicalAddress;
+ int randomKey;
+ }
+
public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup));
}
@@ -185,7 +194,7 @@ public class ConnectionPool implements AutoCloseable {
}
void closeAllConnections() {
- pool.values().forEach(map -> map.values().forEach(future -> {
+ pool.values().forEach(future -> {
if (future.isDone()) {
if (!future.isCompletedExceptionally()) {
// Connection was already created successfully, the join will not throw any exception
@@ -198,10 +207,9 @@ public class ConnectionPool implements AutoCloseable {
// succeed
future.thenAccept(ClientCnx::close);
}
- }));
+ });
}
-
- /**
+ /**
* Get a connection from the pool.
* <p>
* The connection can either be created or be coming from the pool itself.
@@ -222,51 +230,44 @@ public class ConnectionPool implements AutoCloseable {
InetSocketAddress physicalAddress, final int randomKey) {
if (maxConnectionsPerHosts == 0) {
// Disable pooling
- return createConnection(logicalAddress, physicalAddress, -1);
+ return createConnection(new Key(logicalAddress, physicalAddress, -1));
}
-
- final ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool =
- pool.computeIfAbsent(logicalAddress, a -> new ConcurrentHashMap<>());
- CompletableFuture<ClientCnx> completableFuture = innerPool
- .computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey));
+ Key key = new Key(logicalAddress, physicalAddress, randomKey);
+ CompletableFuture<ClientCnx> completableFuture = pool.computeIfAbsent(key, k -> createConnection(key));
if (completableFuture.isCompletedExceptionally()) {
// we cannot cache a failed connection, so we remove it from the pool
// there is a race condition in which
// cleanupConnection is called before caching this result
// and so the clean up fails
- cleanupConnection(logicalAddress, randomKey, completableFuture);
+ pool.remove(key, completableFuture);
return completableFuture;
}
return completableFuture.thenCompose(clientCnx -> {
// If connection already release, create a new one.
if (clientCnx.getIdleState().isReleased()) {
- cleanupConnection(logicalAddress, randomKey, completableFuture);
- return innerPool
- .computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey));
+ pool.remove(key, completableFuture);
+ return pool.computeIfAbsent(key, k -> createConnection(key));
}
// Try use exists connection.
if (clientCnx.getIdleState().tryMarkUsingAndClearIdleTime()) {
return CompletableFuture.completedFuture(clientCnx);
} else {
// If connection already release, create a new one.
- cleanupConnection(logicalAddress, randomKey, completableFuture);
- return innerPool
- .computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey));
+ pool.remove(key, completableFuture);
+ return pool.computeIfAbsent(key, k -> createConnection(key));
}
});
}
- private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalAddress,
- InetSocketAddress physicalAddress, int connectionKey) {
+ private CompletableFuture<ClientCnx> createConnection(Key key) {
if (log.isDebugEnabled()) {
- log.debug("Connection for {} not found in cache", logicalAddress);
+ log.debug("Connection for {} not found in cache", key.logicalAddress);
}
final CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<>();
-
// Trigger async connect to broker
- createConnection(logicalAddress, physicalAddress).thenAccept(channel -> {
+ createConnection(key.logicalAddress, key.physicalAddress).thenAccept(channel -> {
log.info("[{}] Connected to server", channel);
channel.closeFuture().addListener(v -> {
@@ -274,7 +275,7 @@ public class ConnectionPool implements AutoCloseable {
if (log.isDebugEnabled()) {
log.debug("Removing closed connection from pool: {}", v);
}
- cleanupConnection(logicalAddress, connectionKey, cnxFuture);
+ pool.remove(key, cnxFuture);
});
// We are connected to broker, but need to wait until the connect/connected handshake is
@@ -300,14 +301,14 @@ public class ConnectionPool implements AutoCloseable {
// CompletableFuture is cached into the "pool" map,
// it is not enough to clean it here, we need to clean it
// in the "pool" map when the CompletableFuture is cached
- cleanupConnection(logicalAddress, connectionKey, cnxFuture);
+ pool.remove(key, cnxFuture);
cnx.ctx().close();
return null;
});
}).exceptionally(exception -> {
eventLoopGroup.execute(() -> {
- log.warn("Failed to open connection to {} : {}", physicalAddress, exception.getMessage());
- cleanupConnection(logicalAddress, connectionKey, cnxFuture);
+ log.warn("Failed to open connection to {} : {}", key.physicalAddress, exception.getMessage());
+ pool.remove(key, cnxFuture);
cnxFuture.completeExceptionally(new PulsarClientException(exception));
});
return null;
@@ -439,17 +440,9 @@ public class ConnectionPool implements AutoCloseable {
}
}
- private void cleanupConnection(InetSocketAddress address, int connectionKey,
- CompletableFuture<ClientCnx> connectionFuture) {
- ConcurrentMap<Integer, CompletableFuture<ClientCnx>> map = pool.get(address);
- if (map != null) {
- map.remove(connectionKey, connectionFuture);
- }
- }
-
@VisibleForTesting
int getPoolSize() {
- return pool.values().stream().mapToInt(Map::size).sum();
+ return pool.size();
}
private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class);
@@ -459,11 +452,8 @@ public class ConnectionPool implements AutoCloseable {
return;
}
List<Runnable> releaseIdleConnectionTaskList = new ArrayList<>();
- for (Map.Entry<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> entry :
- pool.entrySet()){
- ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool = entry.getValue();
- for (Map.Entry<Integer, CompletableFuture<ClientCnx>> entry0 : innerPool.entrySet()) {
- CompletableFuture<ClientCnx> future = entry0.getValue();
+ for (Map.Entry<Key, CompletableFuture<ClientCnx>> entry : pool.entrySet()) {
+ CompletableFuture<ClientCnx> future = entry.getValue();
// Ensure connection has been connected.
if (!future.isDone()) {
continue;
@@ -481,18 +471,17 @@ public class ConnectionPool implements AutoCloseable {
if (clientCnx.getIdleState().isReleasing()) {
releaseIdleConnectionTaskList.add(() -> {
if (clientCnx.getIdleState().tryMarkReleasedAndCloseConnection()) {
- cleanupConnection(entry.getKey(), entry0.getKey(), future);
+ pool.remove(entry.getKey(), future);
}
});
}
}
- }
// Do release idle connections.
releaseIdleConnectionTaskList.forEach(Runnable::run);
}
public Set<CompletableFuture<ClientCnx>> getConnections() {
return Collections.unmodifiableSet(
- pool.values().stream().flatMap(n -> n.values().stream()).collect(Collectors.toSet()));
+ pool.values().stream().collect(Collectors.toSet()));
}
}