You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/08/31 01:56:45 UTC
[pulsar] 01/02: Fixed Proxy leaking oubound connections (#11848)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6486a55ca735e01af0b1b2af1b8ec9e9d035cf19
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Aug 30 18:53:22 2021 -0700
Fixed Proxy leaking oubound connections (#11848)
---
.../pulsar/proxy/server/ProxyConnection.java | 23 ++++++++++++++++------
1 file changed, 17 insertions(+), 6 deletions(-)
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 3fdb0d5e..25f88102 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -69,6 +69,7 @@ import lombok.Getter;
public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
// ConnectionPool is used by the proxy to issue lookup requests
private PulsarClientImpl client;
+ private ConnectionPool connectionPool;
private ProxyService service;
private Authentication clientAuthentication;
AuthenticationDataSource authenticationData;
@@ -159,6 +160,14 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
}
service.getClientCnxs().remove(this);
LOG.info("[{}] Connection closed", remoteAddress);
+
+ if (connectionPool != null) {
+ try {
+ connectionPool.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close connection pool {}", e.getMessage(), e);
+ }
+ }
}
@Override
@@ -297,9 +306,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
// authn not enabled, complete
if (!service.getConfiguration().isAuthenticationEnabled()) {
- this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup(),
- new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
- () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion)), service.getTimer());
+ this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+ () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion));
+ this.client =
+ new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
completeConnect();
return;
@@ -434,9 +444,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final AuthData clientAuthData,
final String clientAuthMethod, final int protocolVersion) throws PulsarClientException {
- return new PulsarClientImpl(clientConf, service.getWorkerGroup(),
- new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf,
- service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion)), service.getTimer());
+ this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+ () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
+ clientAuthMethod, protocolVersion));
+ return new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
}
private static int getProtocolVersionToAdvertise(CommandConnect connect) {