You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2024/01/11 00:07:44 UTC
(pulsar) branch master updated: [fix][test] Fix thread leak in InjectedClientCnxClientBuilder and EnableProxyProtocolTest (#21878)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ce09f55ea94 [fix][test] Fix thread leak in InjectedClientCnxClientBuilder and EnableProxyProtocolTest (#21878)
ce09f55ea94 is described below
commit ce09f55ea94912434515495bf1ca0d8d094dfb5e
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Jan 10 16:07:37 2024 -0800
[fix][test] Fix thread leak in InjectedClientCnxClientBuilder and EnableProxyProtocolTest (#21878)
---
.../broker/service/EnableProxyProtocolTest.java | 13 ++-------
.../client/api/InjectedClientCnxClientBuilder.java | 32 +++++++++++++++++++++-
2 files changed, 33 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java
index 33e797fcb21..a596e1ed32d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java
@@ -23,12 +23,10 @@ import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
-import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
@@ -51,15 +49,6 @@ public class EnableProxyProtocolTest extends BrokerTestBase {
super.baseSetup();
}
- protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
- ClientBuilder clientBuilder =
- PulsarClient.builder()
- .serviceUrl(url)
- .statsInterval(intervalInSecs, TimeUnit.SECONDS);
- customizeNewPulsarClientBuilder(clientBuilder);
- return createNewPulsarClient(clientBuilder);
- }
-
@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
@@ -108,6 +97,7 @@ public class EnableProxyProtocolTest extends BrokerTestBase {
// Create a client that injected the protocol implementation.
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
+ @Cleanup
PulsarClientImpl protocolClient = InjectedClientCnxClientBuilder.create(clientBuilder,
(conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
@@ -132,6 +122,7 @@ public class EnableProxyProtocolTest extends BrokerTestBase {
// Create a client that injected the protocol implementation.
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
+ @Cleanup
PulsarClientImpl protocolClient = InjectedClientCnxClientBuilder.create(clientBuilder,
(conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InjectedClientCnxClientBuilder.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InjectedClientCnxClientBuilder.java
index d29dd4f7061..2a790824270 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InjectedClientCnxClientBuilder.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InjectedClientCnxClientBuilder.java
@@ -19,7 +19,10 @@
package org.apache.pulsar.client.api;
import io.netty.channel.EventLoopGroup;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
@@ -42,11 +45,38 @@ public class InjectedClientCnxClientBuilder {
ConnectionPool pool = new ConnectionPool(conf, eventLoopGroup,
() -> clientCnxFactory.generate(conf, eventLoopGroup));
- return new PulsarClientImpl(conf, eventLoopGroup, pool);
+ return new InjectedClientCnxPulsarClientImpl(conf, eventLoopGroup, pool);
}
public interface ClientCnxFactory {
ClientCnx generate(ClientConfigurationData conf, EventLoopGroup eventLoopGroup);
}
+
+ @Slf4j
+ private static class InjectedClientCnxPulsarClientImpl extends PulsarClientImpl {
+
+ public InjectedClientCnxPulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
+ ConnectionPool pool)
+ throws PulsarClientException {
+ super(conf, eventLoopGroup, pool);
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return super.closeAsync().handle((v, ex) -> {
+ try {
+ getCnxPool().close();
+ } catch (Exception e) {
+ log.warn("Failed to close cnx pool", e);
+ }
+ try {
+ eventLoopGroup.shutdownGracefully().get(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.warn("Failed to shutdown event loop group", e);
+ }
+ return null;
+ });
+ }
+ }
}