You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/08/31 01:56:45 UTC

[pulsar] 01/02: Fixed Proxy leaking oubound connections (#11848)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6486a55ca735e01af0b1b2af1b8ec9e9d035cf19
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Aug 30 18:53:22 2021 -0700

    Fixed Proxy leaking oubound connections (#11848)
---
 .../pulsar/proxy/server/ProxyConnection.java       | 23 ++++++++++++++++------
 1 file changed, 17 insertions(+), 6 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 3fdb0d5e..25f88102 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -69,6 +69,7 @@ import lombok.Getter;
 public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
     // ConnectionPool is used by the proxy to issue lookup requests
     private PulsarClientImpl client;
+    private ConnectionPool connectionPool;
     private ProxyService service;
     private Authentication clientAuthentication;
     AuthenticationDataSource authenticationData;
@@ -159,6 +160,14 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         }
         service.getClientCnxs().remove(this);
         LOG.info("[{}] Connection closed", remoteAddress);
+
+        if (connectionPool != null) {
+            try {
+                connectionPool.close();
+            } catch (Exception e) {
+                LOG.error("Failed to close connection pool {}", e.getMessage(), e);
+            }
+        }
     }
 
     @Override
@@ -297,9 +306,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
 
             // authn not enabled, complete
             if (!service.getConfiguration().isAuthenticationEnabled()) {
-                this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup(),
-                    new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
-                        () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion)), service.getTimer());
+                this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+                        () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion));
+                this.client =
+                        new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
 
                 completeConnect();
                 return;
@@ -434,9 +444,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
 
     private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final AuthData clientAuthData,
             final String clientAuthMethod, final int protocolVersion) throws PulsarClientException {
-        return new PulsarClientImpl(clientConf, service.getWorkerGroup(),
-                new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf,
-                        service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion)), service.getTimer());
+        this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+                () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
+                        clientAuthMethod, protocolVersion));
+        return new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
     }
 
     private static int getProtocolVersionToAdvertise(CommandConnect connect) {