You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2024/01/11 00:07:44 UTC

(pulsar) branch master updated: [fix][test] Fix thread leak in InjectedClientCnxClientBuilder and EnableProxyProtocolTest (#21878)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ce09f55ea94 [fix][test] Fix thread leak in InjectedClientCnxClientBuilder and EnableProxyProtocolTest (#21878)
ce09f55ea94 is described below

commit ce09f55ea94912434515495bf1ca0d8d094dfb5e
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Jan 10 16:07:37 2024 -0800

    [fix][test] Fix thread leak in InjectedClientCnxClientBuilder and EnableProxyProtocolTest (#21878)
---
 .../broker/service/EnableProxyProtocolTest.java    | 13 ++-------
 .../client/api/InjectedClientCnxClientBuilder.java | 32 +++++++++++++++++++++-
 2 files changed, 33 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java
index 33e797fcb21..a596e1ed32d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java
@@ -23,12 +23,10 @@ import io.netty.channel.ChannelHandlerContext;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.BrokerTestUtil;
-import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.ClientCnx;
@@ -51,15 +49,6 @@ public class EnableProxyProtocolTest extends BrokerTestBase  {
         super.baseSetup();
     }
 
-    protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
-        ClientBuilder clientBuilder =
-                PulsarClient.builder()
-                        .serviceUrl(url)
-                        .statsInterval(intervalInSecs, TimeUnit.SECONDS);
-        customizeNewPulsarClientBuilder(clientBuilder);
-        return createNewPulsarClient(clientBuilder);
-    }
-
     @AfterClass(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
@@ -108,6 +97,7 @@ public class EnableProxyProtocolTest extends BrokerTestBase  {
 
         // Create a client that injected the protocol implementation.
         ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
+        @Cleanup
         PulsarClientImpl protocolClient = InjectedClientCnxClientBuilder.create(clientBuilder,
                 (conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) {
                     public void channelActive(ChannelHandlerContext ctx) throws Exception {
@@ -132,6 +122,7 @@ public class EnableProxyProtocolTest extends BrokerTestBase  {
 
         // Create a client that injected the protocol implementation.
         ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
+        @Cleanup
         PulsarClientImpl protocolClient = InjectedClientCnxClientBuilder.create(clientBuilder,
                 (conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) {
                     public void channelActive(ChannelHandlerContext ctx) throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InjectedClientCnxClientBuilder.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InjectedClientCnxClientBuilder.java
index d29dd4f7061..2a790824270 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InjectedClientCnxClientBuilder.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InjectedClientCnxClientBuilder.java
@@ -19,7 +19,10 @@
 package org.apache.pulsar.client.api;
 
 import io.netty.channel.EventLoopGroup;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConnectionPool;
@@ -42,11 +45,38 @@ public class InjectedClientCnxClientBuilder {
         ConnectionPool pool = new ConnectionPool(conf, eventLoopGroup,
                 () -> clientCnxFactory.generate(conf, eventLoopGroup));
 
-        return new PulsarClientImpl(conf, eventLoopGroup, pool);
+        return new InjectedClientCnxPulsarClientImpl(conf, eventLoopGroup, pool);
     }
 
     public interface ClientCnxFactory {
 
         ClientCnx generate(ClientConfigurationData conf, EventLoopGroup eventLoopGroup);
     }
+
+    @Slf4j
+    private static class InjectedClientCnxPulsarClientImpl extends PulsarClientImpl {
+
+        public InjectedClientCnxPulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
+                                                 ConnectionPool pool)
+                throws PulsarClientException {
+            super(conf, eventLoopGroup, pool);
+        }
+
+        @Override
+        public CompletableFuture<Void> closeAsync() {
+            return super.closeAsync().handle((v, ex) -> {
+                try {
+                    getCnxPool().close();
+                } catch (Exception e) {
+                    log.warn("Failed to close cnx pool", e);
+                }
+                try {
+                    eventLoopGroup.shutdownGracefully().get(10, TimeUnit.SECONDS);
+                } catch (Exception e) {
+                    log.warn("Failed to shutdown event loop group", e);
+                }
+                return null;
+            });
+        }
+    }
 }