You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/20 08:39:50 UTC

[pulsar] branch branch-2.6 updated: [Issue 6319][Pulsar client] connection leak fix (#6524) (#8642)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 30db702  [Issue 6319][Pulsar client] connection leak fix (#6524) (#8642)
30db702 is described below

commit 30db70267e427f6c5aa9ab95ef88e9cf7872f01f
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Nov 20 16:39:22 2020 +0800

    [Issue 6319][Pulsar client] connection leak fix (#6524) (#8642)
    
    cherry-pick from 0c9c9fccbbdebd730acc04a759786babe4ce967a #6524
---
 .../pulsar/client/impl/ConnectionPoolTest.java     |  65 +++++--
 .../client/impl/BinaryProtoLookupService.java      | 187 ++++++++++++---------
 .../pulsar/client/impl/ConnectionHandler.java      |   1 +
 .../apache/pulsar/client/impl/ConnectionPool.java  |  18 ++
 .../pulsar/proxy/server/LookupProxyHandler.java    | 103 +++++++-----
 5 files changed, 234 insertions(+), 140 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
index 32bb772..be08169 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
@@ -18,22 +18,23 @@
  */
 package org.apache.pulsar.client.impl;
 
-import java.net.InetAddress;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
+import com.google.common.collect.Lists;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
-
-import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.DefaultThreadFactory;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
 
 public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
 
@@ -64,9 +65,7 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
         result.add(InetAddress.getByName("127.0.0.1"));
         Mockito.when(pool.resolveName("non-existing-dns-name")).thenReturn(CompletableFuture.completedFuture(result));
 
-        client.newProducer()
-                .topic("persistent://sample/standalone/ns/my-topic")
-                .create();
+        client.newProducer().topic("persistent://sample/standalone/ns/my-topic").create();
 
         client.close();
         eventLoop.shutdownGracefully();
@@ -95,4 +94,48 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
 
         eventLoop.shutdownGracefully();
     }
+
+    @Test
+    public void testNoConnectionPool() throws Exception {
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setConnectionsPerBroker(0);
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, new DefaultThreadFactory("test"));
+        ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
+
+        InetSocketAddress brokerAddress =
+            InetSocketAddress.createUnresolved("127.0.0.1", pulsar.getBrokerListenPort().get());
+        IntStream.range(1, 5).forEach(i -> {
+            pool.getConnection(brokerAddress).thenAccept(cnx -> {
+                Assert.assertTrue(cnx.channel().isActive());
+                pool.releaseConnection(cnx);
+                Assert.assertTrue(cnx.channel().isActive());
+            });
+        });
+        Assert.assertEquals(pool.getPoolSize(), 0);
+
+        pool.closeAllConnections();
+        pool.close();
+    }
+
+    @Test
+    public void testEnableConnectionPool() throws Exception {
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setConnectionsPerBroker(5);
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, new DefaultThreadFactory("test"));
+        ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
+
+        InetSocketAddress brokerAddress =
+            InetSocketAddress.createUnresolved("127.0.0.1", pulsar.getBrokerListenPort().get());
+        IntStream.range(1, 10).forEach(i -> {
+            pool.getConnection(brokerAddress).thenAccept(cnx -> {
+                Assert.assertTrue(cnx.channel().isActive());
+                pool.releaseConnection(cnx);
+                Assert.assertTrue(cnx.channel().isActive());
+            });
+        });
+        Assert.assertTrue(pool.getPoolSize() <= 5 && pool.getPoolSize() > 0);
+
+        pool.closeAllConnections();
+        pool.close();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index e64e161..afa0734 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -109,65 +109,65 @@ public class BinaryProtoLookupService implements LookupService {
 
         client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
             long requestId = client.newRequestId();
-            ByteBuf request = Commands.newLookup(topicName.toString(), this.listenerName, authoritative, requestId);
-            clientCnx.newLookup(request, requestId).thenAccept(lookupDataResult -> {
-                URI uri = null;
-                try {
-                    // (1) build response broker-address
-                    if (useTls) {
-                        uri = new URI(lookupDataResult.brokerUrlTls);
-                    } else {
-                        String serviceUrl = lookupDataResult.brokerUrl;
-                        uri = new URI(serviceUrl);
+            ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId);
+            clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
+                if (t != null) {
+                    // lookup failed
+                    log.warn("[{}] failed to send lookup request : {}", topicName.toString(), t.getMessage());
+                    if (log.isDebugEnabled()) {
+                        log.warn("[{}] Lookup response exception: {}", topicName.toString(), t);
                     }
 
-                    InetSocketAddress responseBrokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
-
-                    // (2) redirect to given address if response is: redirect
-                    if (lookupDataResult.redirect) {
-                        findBroker(responseBrokerAddress, lookupDataResult.authoritative, topicName, redirectCount + 1)
-                                .thenAccept(addressPair -> {
-                                    addressFuture.complete(addressPair);
-                                }).exceptionally((lookupException) -> {
-                                    // lookup failed
-                                    if (redirectCount > 0) {
-                                        if (log.isDebugEnabled()) {
-                                            log.debug("[{}] lookup redirection failed ({}) : {}", topicName.toString(),
-                                                    redirectCount, lookupException.getMessage());
-                                        }
-                                    } else {
-                                        log.warn("[{}] lookup failed : {}", topicName.toString(),
-                                                lookupException.getMessage(), lookupException);
+                    addressFuture.completeExceptionally(t);
+                } else {
+                    URI uri = null;
+                    try {
+                        // (1) build response broker-address
+                        if (useTls) {
+                            uri = new URI(r.brokerUrlTls);
+                        } else {
+                            String serviceUrl = r.brokerUrl;
+                            uri = new URI(serviceUrl);
+                        }
+
+                        InetSocketAddress responseBrokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
+
+                        // (2) redirect to given address if response is: redirect
+                        if (r.redirect) {
+                            findBroker(responseBrokerAddress, r.authoritative, topicName, redirectCount + 1)
+                                .thenAccept(addressFuture::complete).exceptionally((lookupException) -> {
+                                // lookup failed
+                                if (redirectCount > 0) {
+                                    if (log.isDebugEnabled()) {
+                                        log.debug("[{}] lookup redirection failed ({}) : {}", topicName.toString(),
+                                                redirectCount, lookupException.getMessage());
                                     }
-                                    addressFuture.completeExceptionally(lookupException);
-                                    return null;
-                                });
-                    } else {
-                        // (3) received correct broker to connect
-                        if (lookupDataResult.proxyThroughServiceUrl) {
-                            // Connect through proxy
-                            addressFuture.complete(Pair.of(responseBrokerAddress, socketAddress));
+                                } else {
+                                    log.warn("[{}] lookup failed : {}", topicName.toString(),
+                                            lookupException.getMessage(), lookupException);
+                                }
+                                addressFuture.completeExceptionally(lookupException);
+                                return null;
+                            });
                         } else {
-                            // Normal result with direct connection to broker
-                            addressFuture.complete(Pair.of(responseBrokerAddress, responseBrokerAddress));
+                            // (3) received correct broker to connect
+                            if (r.proxyThroughServiceUrl) {
+                                // Connect through proxy
+                                addressFuture.complete(Pair.of(responseBrokerAddress, socketAddress));
+                            } else {
+                                // Normal result with direct connection to broker
+                                addressFuture.complete(Pair.of(responseBrokerAddress, responseBrokerAddress));
+                            }
                         }
-                    }
 
-                } catch (Exception parseUrlException) {
-                    // Failed to parse url
-                    log.warn("[{}] invalid url {} : {}", topicName.toString(), uri, parseUrlException.getMessage(),
+                    } catch (Exception parseUrlException) {
+                        // Failed to parse url
+                        log.warn("[{}] invalid url {} : {}", topicName.toString(), uri, parseUrlException.getMessage(),
                             parseUrlException);
-                    addressFuture.completeExceptionally(parseUrlException);
-                }
-            }).exceptionally((sendException) -> {
-                // lookup failed
-                log.warn("[{}] failed to send lookup request : {}", topicName.toString(), sendException.getMessage());
-                if (log.isDebugEnabled()) {
-                    log.warn("[{}] Lookup response exception: {}", topicName.toString(), sendException);
+                        addressFuture.completeExceptionally(parseUrlException);
+                    }
                 }
-
-                addressFuture.completeExceptionally(sendException);
-                return null;
+                client.getCnxPool().releaseConnection(clientCnx);
             });
         }).exceptionally(connectionException -> {
             addressFuture.completeExceptionally(connectionException);
@@ -184,20 +184,22 @@ public class BinaryProtoLookupService implements LookupService {
         client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
             long requestId = client.newRequestId();
             ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
-            clientCnx.newLookup(request, requestId).thenAccept(lookupDataResult -> {
-                try {
-                    partitionFuture.complete(new PartitionedTopicMetadata(lookupDataResult.partitions));
-                } catch (Exception e) {
-                    partitionFuture.completeExceptionally(new PulsarClientException.LookupException(
-                        format("Failed to parse partition-response redirect=%s, topic=%s, partitions with %s",
-                            lookupDataResult.redirect, topicName.toString(), lookupDataResult.partitions,
-                            e.getMessage())));
+            clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
+                if (t != null) {
+                    log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
+                        t.getMessage(), t);
+                    partitionFuture.completeExceptionally(t);
+                } else {
+                    try {
+                        partitionFuture.complete(new PartitionedTopicMetadata(r.partitions));
+                    } catch (Exception e) {
+                        partitionFuture.completeExceptionally(new PulsarClientException.LookupException(
+                            format("Failed to parse partition-response redirect=%s, topic=%s, partitions with %s",
+                                r.redirect, topicName.toString(), r.partitions,
+                                e.getMessage())));
+                    }
                 }
-            }).exceptionally((e) -> {
-                log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
-                        e.getCause().getMessage(), e);
-                partitionFuture.completeExceptionally(e);
-                return null;
+                client.getCnxPool().releaseConnection(clientCnx);
             });
         }).exceptionally(connectionException -> {
             partitionFuture.completeExceptionally(connectionException);
@@ -215,12 +217,29 @@ public class BinaryProtoLookupService implements LookupService {
 
     @Override
     public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version) {
-        return client.getCnxPool().getConnection(serviceNameResolver.resolveHost()).thenCompose(clientCnx -> {
+        InetSocketAddress socketAddress = serviceNameResolver.resolveHost();
+        CompletableFuture<Optional<SchemaInfo>> schemaFuture = new CompletableFuture<>();
+
+        client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
             long requestId = client.newRequestId();
             ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(),
-                    Optional.ofNullable(BytesSchemaVersion.of(version)));
-            return clientCnx.sendGetSchema(request, requestId);
+                Optional.ofNullable(BytesSchemaVersion.of(version)));
+            clientCnx.sendGetSchema(request, requestId).whenComplete((r, t) -> {
+                if (t != null) {
+                    log.warn("[{}] failed to get schema : {}", topicName.toString(),
+                        t.getMessage(), t);
+                    schemaFuture.completeExceptionally(t);
+                } else {
+                    schemaFuture.complete(r);
+                }
+                client.getCnxPool().releaseConnection(clientCnx);
+            });
+        }).exceptionally(ex -> {
+            schemaFuture.completeExceptionally(ex);
+            return null;
         });
+
+        return schemaFuture;
     }
 
     public String getServiceUrl() {
@@ -252,24 +271,26 @@ public class BinaryProtoLookupService implements LookupService {
             ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(
                 namespace.toString(), requestId, mode);
 
-            clientCnx.newGetTopicsOfNamespace(request, requestId).thenAccept(topicsList -> {
-                if (log.isDebugEnabled()) {
-                    log.debug("[namespace: {}] Success get topics list in request: {}", namespace.toString(), requestId);
-                }
-
-                // do not keep partition part of topic name
-                List<String> result = Lists.newArrayList();
-                topicsList.forEach(topic -> {
-                    String filtered = TopicName.get(topic).getPartitionedTopicName();
-                    if (!result.contains(filtered)) {
-                        result.add(filtered);
+            clientCnx.newGetTopicsOfNamespace(request, requestId).whenComplete((r, t) -> {
+                if (t != null) {
+                    topicsFuture.completeExceptionally(t);
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[namespace: {}] Success get topics list in request: {}", namespace.toString(), requestId);
                     }
-                });
 
-                topicsFuture.complete(result);
-            }).exceptionally((e) -> {
-                topicsFuture.completeExceptionally(e);
-                return null;
+                    // do not keep partition part of topic name
+                    List<String> result = Lists.newArrayList();
+                    r.forEach(topic -> {
+                        String filtered = TopicName.get(topic).getPartitionedTopicName();
+                        if (!result.contains(filtered)) {
+                            result.add(filtered);
+                        }
+                    });
+
+                    topicsFuture.complete(result);
+                }
+                client.getCnxPool().releaseConnection(clientCnx);
             });
         }).exceptionally((e) -> {
             long nextDelay = Math.min(backoff.next(), remainingTime.get());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index e035283..d168152 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -110,6 +110,7 @@ public class ConnectionHandler {
 
     @VisibleForTesting
     public void connectionClosed(ClientCnx cnx) {
+        state.client.getCnxPool().releaseConnection(cnx);
         if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
             if (!isValidStateForReconnection()) {
                 log.info("[{}] [{}] Ignoring reconnection request (state: {})", state.topic, state.getHandlerName(), state.getState());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index afccf40..869c642 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -39,6 +39,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -304,6 +305,18 @@ public class ConnectionPool implements Closeable {
         }
     }
 
+    public void releaseConnection(ClientCnx cnx) {
+        if (maxConnectionsPerHosts == 0) {
+            //Disable pooling
+            if (cnx.channel().isActive()) {
+                if(log.isDebugEnabled()) {
+                    log.debug("close connection due to pooling disabled.");
+                }
+                cnx.close();
+            }
+        }
+    }
+
     @Override
     public void close() throws IOException {
         try {
@@ -322,6 +335,11 @@ public class ConnectionPool implements Closeable {
         }
     }
 
+    @VisibleForTesting
+    int getPoolSize() {
+        return pool.values().stream().mapToInt(Map::size).sum();
+    }
+
     public static int signSafeMod(long dividend, int divisor) {
         int mod = (int) (dividend % divisor);
         if (mod < 0) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index e560946..8782793 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -157,32 +157,35 @@ public class LookupProxyHandler {
             long requestId = proxyConnection.newRequestId();
             ByteBuf command;
             command = Commands.newLookup(topic, authoritative, requestId);
-            clientCnx.newLookup(command, requestId).thenAccept(result -> {
-                String brokerUrl = connectWithTLS ? result.brokerUrlTls : result.brokerUrl;
-                if (result.redirect) {
-                    // Need to try the lookup again on a different broker
-                    performLookup(clientRequestId, topic, brokerUrl, result.authoritative, numberOfRetries - 1);
+
+            clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
+                if (t != null) {
+                    log.warn("[{}] Failed to lookup topic {}: {}", clientAddress, topic, t.getMessage());
+                    proxyConnection.ctx().writeAndFlush(
+                        Commands.newLookupErrorResponse(ServerError.ServiceNotReady, t.getMessage(), clientRequestId));
                 } else {
-                    // Reply the same address for both TLS non-TLS. The reason
-                    // is that whether we use TLS
-                    // and broker is independent of whether the client itself
-                    // uses TLS, but we need to force the
-                    // client
-                    // to use the appropriate target broker (and port) when it
-                    // will connect back.
-                    if (log.isDebugEnabled()) {
-                        log.debug(
+                    String brokerUrl = connectWithTLS ? r.brokerUrlTls : r.brokerUrl;
+                    if (r.redirect) {
+                        // Need to try the lookup again on a different broker
+                        performLookup(clientRequestId, topic, brokerUrl, r.authoritative, numberOfRetries - 1);
+                    } else {
+                        // Reply the same address for both TLS non-TLS. The reason
+                        // is that whether we use TLS
+                        // and broker is independent of whether the client itself
+                        // uses TLS, but we need to force the
+                        // client
+                        // to use the appropriate target broker (and port) when it
+                        // will connect back.
+                        if (log.isDebugEnabled()) {
+                            log.debug(
                                 "Successfully perform lookup '{}' for topic '{}' with clientReq Id '{}' and lookup-broker {}",
                                 addr, topic, clientRequestId, brokerUrl);
-                    }
-                    proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, brokerUrl, true,
+                        }
+                        proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, brokerUrl, true,
                             LookupType.Connect, clientRequestId, true /* this is coming from proxy */));
+                    }
                 }
-            }).exceptionally(ex -> {
-                log.warn("[{}] Failed to lookup topic {}: {}", clientAddress, topic, ex.getMessage());
-                proxyConnection.ctx().writeAndFlush(
-                        Commands.newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), clientRequestId));
-                return null;
+                proxyConnection.getConnectionPool().releaseConnection(clientCnx);
             });
         }).exceptionally(ex -> {
             // Failed to connect to backend broker
@@ -252,15 +255,17 @@ public class LookupProxyHandler {
                 long requestId = proxyConnection.newRequestId();
                 ByteBuf command;
                 command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
-                clientCnx.newLookup(command, requestId).thenAccept(lookupDataResult -> {
-                    proxyConnection.ctx().writeAndFlush(
-                            Commands.newPartitionMetadataResponse(lookupDataResult.partitions, clientRequestId));
-                }).exceptionally((ex) -> {
-                    log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
-                            ex.getCause().getMessage(), ex);
-                    proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
-                            ex.getMessage(), clientRequestId));
-                    return null;
+                clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
+                    if (t != null) {
+                        log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
+                            t.getMessage(), t);
+                        proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+                            t.getMessage(), clientRequestId));
+                    } else {
+                        proxyConnection.ctx().writeAndFlush(
+                            Commands.newPartitionMetadataResponse(r.partitions, clientRequestId));
+                    }
+                    proxyConnection.getConnectionPool().releaseConnection(clientCnx);
                 });
             }).exceptionally(ex -> {
                 // Failed to connect to backend broker
@@ -331,15 +336,18 @@ public class LookupProxyHandler {
             long requestId = proxyConnection.newRequestId();
             ByteBuf command;
             command = Commands.newGetTopicsOfNamespaceRequest(namespaceName, requestId, mode);
-            clientCnx.newGetTopicsOfNamespace(command, requestId).thenAccept(topicList ->
-                proxyConnection.ctx().writeAndFlush(
-                    Commands.newGetTopicsOfNamespaceResponse(topicList, clientRequestId))
-            ).exceptionally(ex -> {
-                log.warn("[{}] Failed to get TopicsOfNamespace {}: {}", clientAddress, namespaceName, ex.getMessage());
-                proxyConnection.ctx().writeAndFlush(
-                        Commands.newError(clientRequestId, ServerError.ServiceNotReady, ex.getMessage()));
-                return null;
+            clientCnx.newGetTopicsOfNamespace(command, requestId).whenComplete((r, t) -> {
+                if (t != null) {
+                    log.warn("[{}] Failed to get TopicsOfNamespace {}: {}", clientAddress, namespaceName, t.getMessage());
+                    proxyConnection.ctx().writeAndFlush(
+                        Commands.newError(clientRequestId, ServerError.ServiceNotReady, t.getMessage()));
+                } else {
+                    proxyConnection.ctx().writeAndFlush(
+                        Commands.newGetTopicsOfNamespaceResponse(r, clientRequestId));
+                }
             });
+
+            proxyConnection.getConnectionPool().releaseConnection(clientCnx);
         }).exceptionally(ex -> {
             // Failed to connect to backend broker
             proxyConnection.ctx().writeAndFlush(
@@ -381,14 +389,17 @@ public class LookupProxyHandler {
             }
             command = Commands.newGetSchema(requestId, topic,
                     Optional.ofNullable(schemaVersion).map(BytesSchemaVersion::of));
-            clientCnx.sendGetRawSchema(command, requestId).thenAccept(response -> {
-                proxyConnection.ctx().writeAndFlush(
-                        Commands.newGetSchemaResponse(clientRequestId, response));
-            }).exceptionally(ex -> {
-                log.warn("[{}] Failed to get schema {}: {}", clientAddress, topic, ex);
-                proxyConnection.ctx().writeAndFlush(
-                        Commands.newError(clientRequestId, ServerError.ServiceNotReady, ex.getMessage()));
-                return null;
+            clientCnx.sendGetRawSchema(command, requestId).whenComplete((r, t) -> {
+                if (t != null) {
+                    log.warn("[{}] Failed to get schema {}: {}", clientAddress, topic, t);
+                    proxyConnection.ctx().writeAndFlush(
+                        Commands.newError(clientRequestId, ServerError.ServiceNotReady, t.getMessage()));
+                } else {
+                    proxyConnection.ctx().writeAndFlush(
+                        Commands.newGetSchemaResponse(clientRequestId, r));
+                }
+
+                proxyConnection.getConnectionPool().releaseConnection(clientCnx);
             });
         }).exceptionally(ex -> {
             // Failed to connect to backend broker