You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/20 08:39:50 UTC
[pulsar] branch branch-2.6 updated: [Issue 6319][Pulsar client]
connection leak fix (#6524) (#8642)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 30db702 [Issue 6319][Pulsar client] connection leak fix (#6524) (#8642)
30db702 is described below
commit 30db70267e427f6c5aa9ab95ef88e9cf7872f01f
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Nov 20 16:39:22 2020 +0800
[Issue 6319][Pulsar client] connection leak fix (#6524) (#8642)
cherry-pick from 0c9c9fccbbdebd730acc04a759786babe4ce967a #6524
---
.../pulsar/client/impl/ConnectionPoolTest.java | 65 +++++--
.../client/impl/BinaryProtoLookupService.java | 187 ++++++++++++---------
.../pulsar/client/impl/ConnectionHandler.java | 1 +
.../apache/pulsar/client/impl/ConnectionPool.java | 18 ++
.../pulsar/proxy/server/LookupProxyHandler.java | 103 +++++++-----
5 files changed, 234 insertions(+), 140 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
index 32bb772..be08169 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
@@ -18,22 +18,23 @@
*/
package org.apache.pulsar.client.impl;
-import java.net.InetAddress;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
+import com.google.common.collect.Lists;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.mockito.Mockito;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import com.google.common.collect.Lists;
-
-import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.DefaultThreadFactory;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
@@ -64,9 +65,7 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
result.add(InetAddress.getByName("127.0.0.1"));
Mockito.when(pool.resolveName("non-existing-dns-name")).thenReturn(CompletableFuture.completedFuture(result));
- client.newProducer()
- .topic("persistent://sample/standalone/ns/my-topic")
- .create();
+ client.newProducer().topic("persistent://sample/standalone/ns/my-topic").create();
client.close();
eventLoop.shutdownGracefully();
@@ -95,4 +94,48 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
eventLoop.shutdownGracefully();
}
+
+ @Test
+ public void testNoConnectionPool() throws Exception {
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setConnectionsPerBroker(0);
+ EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, new DefaultThreadFactory("test"));
+ ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
+
+ InetSocketAddress brokerAddress =
+ InetSocketAddress.createUnresolved("127.0.0.1", pulsar.getBrokerListenPort().get());
+ IntStream.range(1, 5).forEach(i -> {
+ pool.getConnection(brokerAddress).thenAccept(cnx -> {
+ Assert.assertTrue(cnx.channel().isActive());
+ pool.releaseConnection(cnx);
+ Assert.assertTrue(cnx.channel().isActive());
+ });
+ });
+ Assert.assertEquals(pool.getPoolSize(), 0);
+
+ pool.closeAllConnections();
+ pool.close();
+ }
+
+ @Test
+ public void testEnableConnectionPool() throws Exception {
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setConnectionsPerBroker(5);
+ EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, new DefaultThreadFactory("test"));
+ ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
+
+ InetSocketAddress brokerAddress =
+ InetSocketAddress.createUnresolved("127.0.0.1", pulsar.getBrokerListenPort().get());
+ IntStream.range(1, 10).forEach(i -> {
+ pool.getConnection(brokerAddress).thenAccept(cnx -> {
+ Assert.assertTrue(cnx.channel().isActive());
+ pool.releaseConnection(cnx);
+ Assert.assertTrue(cnx.channel().isActive());
+ });
+ });
+ Assert.assertTrue(pool.getPoolSize() <= 5 && pool.getPoolSize() > 0);
+
+ pool.closeAllConnections();
+ pool.close();
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index e64e161..afa0734 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -109,65 +109,65 @@ public class BinaryProtoLookupService implements LookupService {
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
long requestId = client.newRequestId();
- ByteBuf request = Commands.newLookup(topicName.toString(), this.listenerName, authoritative, requestId);
- clientCnx.newLookup(request, requestId).thenAccept(lookupDataResult -> {
- URI uri = null;
- try {
- // (1) build response broker-address
- if (useTls) {
- uri = new URI(lookupDataResult.brokerUrlTls);
- } else {
- String serviceUrl = lookupDataResult.brokerUrl;
- uri = new URI(serviceUrl);
+ ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId);
+ clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
+ if (t != null) {
+ // lookup failed
+ log.warn("[{}] failed to send lookup request : {}", topicName.toString(), t.getMessage());
+ if (log.isDebugEnabled()) {
+ log.warn("[{}] Lookup response exception: {}", topicName.toString(), t);
}
- InetSocketAddress responseBrokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
-
- // (2) redirect to given address if response is: redirect
- if (lookupDataResult.redirect) {
- findBroker(responseBrokerAddress, lookupDataResult.authoritative, topicName, redirectCount + 1)
- .thenAccept(addressPair -> {
- addressFuture.complete(addressPair);
- }).exceptionally((lookupException) -> {
- // lookup failed
- if (redirectCount > 0) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] lookup redirection failed ({}) : {}", topicName.toString(),
- redirectCount, lookupException.getMessage());
- }
- } else {
- log.warn("[{}] lookup failed : {}", topicName.toString(),
- lookupException.getMessage(), lookupException);
+ addressFuture.completeExceptionally(t);
+ } else {
+ URI uri = null;
+ try {
+ // (1) build response broker-address
+ if (useTls) {
+ uri = new URI(r.brokerUrlTls);
+ } else {
+ String serviceUrl = r.brokerUrl;
+ uri = new URI(serviceUrl);
+ }
+
+ InetSocketAddress responseBrokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
+
+ // (2) redirect to given address if response is: redirect
+ if (r.redirect) {
+ findBroker(responseBrokerAddress, r.authoritative, topicName, redirectCount + 1)
+ .thenAccept(addressFuture::complete).exceptionally((lookupException) -> {
+ // lookup failed
+ if (redirectCount > 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] lookup redirection failed ({}) : {}", topicName.toString(),
+ redirectCount, lookupException.getMessage());
}
- addressFuture.completeExceptionally(lookupException);
- return null;
- });
- } else {
- // (3) received correct broker to connect
- if (lookupDataResult.proxyThroughServiceUrl) {
- // Connect through proxy
- addressFuture.complete(Pair.of(responseBrokerAddress, socketAddress));
+ } else {
+ log.warn("[{}] lookup failed : {}", topicName.toString(),
+ lookupException.getMessage(), lookupException);
+ }
+ addressFuture.completeExceptionally(lookupException);
+ return null;
+ });
} else {
- // Normal result with direct connection to broker
- addressFuture.complete(Pair.of(responseBrokerAddress, responseBrokerAddress));
+ // (3) received correct broker to connect
+ if (r.proxyThroughServiceUrl) {
+ // Connect through proxy
+ addressFuture.complete(Pair.of(responseBrokerAddress, socketAddress));
+ } else {
+ // Normal result with direct connection to broker
+ addressFuture.complete(Pair.of(responseBrokerAddress, responseBrokerAddress));
+ }
}
- }
- } catch (Exception parseUrlException) {
- // Failed to parse url
- log.warn("[{}] invalid url {} : {}", topicName.toString(), uri, parseUrlException.getMessage(),
+ } catch (Exception parseUrlException) {
+ // Failed to parse url
+ log.warn("[{}] invalid url {} : {}", topicName.toString(), uri, parseUrlException.getMessage(),
parseUrlException);
- addressFuture.completeExceptionally(parseUrlException);
- }
- }).exceptionally((sendException) -> {
- // lookup failed
- log.warn("[{}] failed to send lookup request : {}", topicName.toString(), sendException.getMessage());
- if (log.isDebugEnabled()) {
- log.warn("[{}] Lookup response exception: {}", topicName.toString(), sendException);
+ addressFuture.completeExceptionally(parseUrlException);
+ }
}
-
- addressFuture.completeExceptionally(sendException);
- return null;
+ client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(connectionException -> {
addressFuture.completeExceptionally(connectionException);
@@ -184,20 +184,22 @@ public class BinaryProtoLookupService implements LookupService {
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
- clientCnx.newLookup(request, requestId).thenAccept(lookupDataResult -> {
- try {
- partitionFuture.complete(new PartitionedTopicMetadata(lookupDataResult.partitions));
- } catch (Exception e) {
- partitionFuture.completeExceptionally(new PulsarClientException.LookupException(
- format("Failed to parse partition-response redirect=%s, topic=%s, partitions with %s",
- lookupDataResult.redirect, topicName.toString(), lookupDataResult.partitions,
- e.getMessage())));
+ clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
+ if (t != null) {
+ log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
+ t.getMessage(), t);
+ partitionFuture.completeExceptionally(t);
+ } else {
+ try {
+ partitionFuture.complete(new PartitionedTopicMetadata(r.partitions));
+ } catch (Exception e) {
+ partitionFuture.completeExceptionally(new PulsarClientException.LookupException(
+ format("Failed to parse partition-response redirect=%s, topic=%s, partitions with %s",
+ r.redirect, topicName.toString(), r.partitions,
+ e.getMessage())));
+ }
}
- }).exceptionally((e) -> {
- log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
- e.getCause().getMessage(), e);
- partitionFuture.completeExceptionally(e);
- return null;
+ client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(connectionException -> {
partitionFuture.completeExceptionally(connectionException);
@@ -215,12 +217,29 @@ public class BinaryProtoLookupService implements LookupService {
@Override
public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version) {
- return client.getCnxPool().getConnection(serviceNameResolver.resolveHost()).thenCompose(clientCnx -> {
+ InetSocketAddress socketAddress = serviceNameResolver.resolveHost();
+ CompletableFuture<Optional<SchemaInfo>> schemaFuture = new CompletableFuture<>();
+
+ client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(),
- Optional.ofNullable(BytesSchemaVersion.of(version)));
- return clientCnx.sendGetSchema(request, requestId);
+ Optional.ofNullable(BytesSchemaVersion.of(version)));
+ clientCnx.sendGetSchema(request, requestId).whenComplete((r, t) -> {
+ if (t != null) {
+ log.warn("[{}] failed to get schema : {}", topicName.toString(),
+ t.getMessage(), t);
+ schemaFuture.completeExceptionally(t);
+ } else {
+ schemaFuture.complete(r);
+ }
+ client.getCnxPool().releaseConnection(clientCnx);
+ });
+ }).exceptionally(ex -> {
+ schemaFuture.completeExceptionally(ex);
+ return null;
});
+
+ return schemaFuture;
}
public String getServiceUrl() {
@@ -252,24 +271,26 @@ public class BinaryProtoLookupService implements LookupService {
ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(
namespace.toString(), requestId, mode);
- clientCnx.newGetTopicsOfNamespace(request, requestId).thenAccept(topicsList -> {
- if (log.isDebugEnabled()) {
- log.debug("[namespace: {}] Success get topics list in request: {}", namespace.toString(), requestId);
- }
-
- // do not keep partition part of topic name
- List<String> result = Lists.newArrayList();
- topicsList.forEach(topic -> {
- String filtered = TopicName.get(topic).getPartitionedTopicName();
- if (!result.contains(filtered)) {
- result.add(filtered);
+ clientCnx.newGetTopicsOfNamespace(request, requestId).whenComplete((r, t) -> {
+ if (t != null) {
+ topicsFuture.completeExceptionally(t);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[namespace: {}] Success get topics list in request: {}", namespace.toString(), requestId);
}
- });
- topicsFuture.complete(result);
- }).exceptionally((e) -> {
- topicsFuture.completeExceptionally(e);
- return null;
+ // do not keep partition part of topic name
+ List<String> result = Lists.newArrayList();
+ r.forEach(topic -> {
+ String filtered = TopicName.get(topic).getPartitionedTopicName();
+ if (!result.contains(filtered)) {
+ result.add(filtered);
+ }
+ });
+
+ topicsFuture.complete(result);
+ }
+ client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally((e) -> {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index e035283..d168152 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -110,6 +110,7 @@ public class ConnectionHandler {
@VisibleForTesting
public void connectionClosed(ClientCnx cnx) {
+ state.client.getCnxPool().releaseConnection(cnx);
if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
if (!isValidStateForReconnection()) {
log.info("[{}] [{}] Ignoring reconnection request (state: {})", state.topic, state.getHandlerName(), state.getState());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index afccf40..869c642 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -39,6 +39,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -304,6 +305,18 @@ public class ConnectionPool implements Closeable {
}
}
+ public void releaseConnection(ClientCnx cnx) {
+ if (maxConnectionsPerHosts == 0) {
+ //Disable pooling
+ if (cnx.channel().isActive()) {
+ if(log.isDebugEnabled()) {
+ log.debug("close connection due to pooling disabled.");
+ }
+ cnx.close();
+ }
+ }
+ }
+
@Override
public void close() throws IOException {
try {
@@ -322,6 +335,11 @@ public class ConnectionPool implements Closeable {
}
}
+ @VisibleForTesting
+ int getPoolSize() {
+ return pool.values().stream().mapToInt(Map::size).sum();
+ }
+
public static int signSafeMod(long dividend, int divisor) {
int mod = (int) (dividend % divisor);
if (mod < 0) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index e560946..8782793 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -157,32 +157,35 @@ public class LookupProxyHandler {
long requestId = proxyConnection.newRequestId();
ByteBuf command;
command = Commands.newLookup(topic, authoritative, requestId);
- clientCnx.newLookup(command, requestId).thenAccept(result -> {
- String brokerUrl = connectWithTLS ? result.brokerUrlTls : result.brokerUrl;
- if (result.redirect) {
- // Need to try the lookup again on a different broker
- performLookup(clientRequestId, topic, brokerUrl, result.authoritative, numberOfRetries - 1);
+
+ clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
+ if (t != null) {
+ log.warn("[{}] Failed to lookup topic {}: {}", clientAddress, topic, t.getMessage());
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newLookupErrorResponse(ServerError.ServiceNotReady, t.getMessage(), clientRequestId));
} else {
- // Reply the same address for both TLS non-TLS. The reason
- // is that whether we use TLS
- // and broker is independent of whether the client itself
- // uses TLS, but we need to force the
- // client
- // to use the appropriate target broker (and port) when it
- // will connect back.
- if (log.isDebugEnabled()) {
- log.debug(
+ String brokerUrl = connectWithTLS ? r.brokerUrlTls : r.brokerUrl;
+ if (r.redirect) {
+ // Need to try the lookup again on a different broker
+ performLookup(clientRequestId, topic, brokerUrl, r.authoritative, numberOfRetries - 1);
+ } else {
+ // Reply the same address for both TLS non-TLS. The reason
+ // is that whether we use TLS
+ // and broker is independent of whether the client itself
+ // uses TLS, but we need to force the
+ // client
+ // to use the appropriate target broker (and port) when it
+ // will connect back.
+ if (log.isDebugEnabled()) {
+ log.debug(
"Successfully perform lookup '{}' for topic '{}' with clientReq Id '{}' and lookup-broker {}",
addr, topic, clientRequestId, brokerUrl);
- }
- proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, brokerUrl, true,
+ }
+ proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, brokerUrl, true,
LookupType.Connect, clientRequestId, true /* this is coming from proxy */));
+ }
}
- }).exceptionally(ex -> {
- log.warn("[{}] Failed to lookup topic {}: {}", clientAddress, topic, ex.getMessage());
- proxyConnection.ctx().writeAndFlush(
- Commands.newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), clientRequestId));
- return null;
+ proxyConnection.getConnectionPool().releaseConnection(clientCnx);
});
}).exceptionally(ex -> {
// Failed to connect to backend broker
@@ -252,15 +255,17 @@ public class LookupProxyHandler {
long requestId = proxyConnection.newRequestId();
ByteBuf command;
command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
- clientCnx.newLookup(command, requestId).thenAccept(lookupDataResult -> {
- proxyConnection.ctx().writeAndFlush(
- Commands.newPartitionMetadataResponse(lookupDataResult.partitions, clientRequestId));
- }).exceptionally((ex) -> {
- log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
- ex.getCause().getMessage(), ex);
- proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
- ex.getMessage(), clientRequestId));
- return null;
+ clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
+ if (t != null) {
+ log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
+ t.getMessage(), t);
+ proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+ t.getMessage(), clientRequestId));
+ } else {
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newPartitionMetadataResponse(r.partitions, clientRequestId));
+ }
+ proxyConnection.getConnectionPool().releaseConnection(clientCnx);
});
}).exceptionally(ex -> {
// Failed to connect to backend broker
@@ -331,15 +336,18 @@ public class LookupProxyHandler {
long requestId = proxyConnection.newRequestId();
ByteBuf command;
command = Commands.newGetTopicsOfNamespaceRequest(namespaceName, requestId, mode);
- clientCnx.newGetTopicsOfNamespace(command, requestId).thenAccept(topicList ->
- proxyConnection.ctx().writeAndFlush(
- Commands.newGetTopicsOfNamespaceResponse(topicList, clientRequestId))
- ).exceptionally(ex -> {
- log.warn("[{}] Failed to get TopicsOfNamespace {}: {}", clientAddress, namespaceName, ex.getMessage());
- proxyConnection.ctx().writeAndFlush(
- Commands.newError(clientRequestId, ServerError.ServiceNotReady, ex.getMessage()));
- return null;
+ clientCnx.newGetTopicsOfNamespace(command, requestId).whenComplete((r, t) -> {
+ if (t != null) {
+ log.warn("[{}] Failed to get TopicsOfNamespace {}: {}", clientAddress, namespaceName, t.getMessage());
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newError(clientRequestId, ServerError.ServiceNotReady, t.getMessage()));
+ } else {
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newGetTopicsOfNamespaceResponse(r, clientRequestId));
+ }
});
+
+ proxyConnection.getConnectionPool().releaseConnection(clientCnx);
}).exceptionally(ex -> {
// Failed to connect to backend broker
proxyConnection.ctx().writeAndFlush(
@@ -381,14 +389,17 @@ public class LookupProxyHandler {
}
command = Commands.newGetSchema(requestId, topic,
Optional.ofNullable(schemaVersion).map(BytesSchemaVersion::of));
- clientCnx.sendGetRawSchema(command, requestId).thenAccept(response -> {
- proxyConnection.ctx().writeAndFlush(
- Commands.newGetSchemaResponse(clientRequestId, response));
- }).exceptionally(ex -> {
- log.warn("[{}] Failed to get schema {}: {}", clientAddress, topic, ex);
- proxyConnection.ctx().writeAndFlush(
- Commands.newError(clientRequestId, ServerError.ServiceNotReady, ex.getMessage()));
- return null;
+ clientCnx.sendGetRawSchema(command, requestId).whenComplete((r, t) -> {
+ if (t != null) {
+ log.warn("[{}] Failed to get schema {}: {}", clientAddress, topic, t);
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newError(clientRequestId, ServerError.ServiceNotReady, t.getMessage()));
+ } else {
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newGetSchemaResponse(clientRequestId, r));
+ }
+
+ proxyConnection.getConnectionPool().releaseConnection(clientCnx);
});
}).exceptionally(ex -> {
// Failed to connect to backend broker