You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2023/10/31 09:46:47 UTC
(pulsar) branch master updated: [fix][test] Add MockedPulsarServiceBaseTest.registerCloseable and continue fixing thread leaks (#21477)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 69740c85e11 [fix][test] Add MockedPulsarServiceBaseTest.registerCloseable and continue fixing thread leaks (#21477)
69740c85e11 is described below
commit 69740c85e111318c8f774294e0129b770fff15e6
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Oct 31 11:46:41 2023 +0200
[fix][test] Add MockedPulsarServiceBaseTest.registerCloseable and continue fixing thread leaks (#21477)
---
.../broker/auth/MockedPulsarServiceBaseTest.java | 21 +++++++
.../broker/service/PrecisePublishLimiterTest.java | 3 +
.../TransactionMetaStoreAssignmentTest.java | 3 +-
...kenOauth2AuthenticatedProducerConsumerTest.java | 20 ++++---
.../impl/HierarchyTopicAutoCreationTest.java | 5 +-
.../worker/PulsarFunctionPublishTest.java | 1 +
.../websocket/proxy/ProxyAuthenticationTest.java | 2 +-
.../websocket/proxy/ProxyAuthorizationTest.java | 2 +-
.../websocket/proxy/ProxyConfigurationTest.java | 2 +-
.../proxy/ProxyEncryptionPublishConsumeTest.java | 2 +-
.../websocket/proxy/ProxyIdleTimeoutTest.java | 2 +-
.../pulsar/websocket/proxy/ProxyPingTest.java | 2 +-
...roxyPublishConsumeClientSideEncryptionTest.java | 2 +-
.../websocket/proxy/ProxyPublishConsumeTest.java | 2 +-
.../proxy/ProxyPublishConsumeTlsTest.java | 2 +-
.../proxy/ProxyPublishConsumeWithoutZKTest.java | 2 +-
.../proxy/v1/V1_ProxyAuthenticationTest.java | 2 +-
.../apache/pulsar/proxy/server/ProxyService.java | 2 +-
.../pulsar/proxy/server/ProxyServiceStarter.java | 26 +++++++--
.../extensions/SimpleProxyExtensionTestBase.java | 5 +-
.../server/AdminProxyHandlerKeystoreTLSTest.java | 6 +-
.../proxy/server/AuthedAdminProxyHandlerTest.java | 6 +-
.../proxy/server/ProxyAdditionalServletTest.java | 5 +-
.../ProxyAuthenticatedProducerConsumerTest.java | 5 +-
.../server/ProxyConnectionThrottlingTest.java | 5 +-
.../proxy/server/ProxyDisableZeroCopyTest.java | 5 +-
.../server/ProxyEnableHAProxyProtocolTest.java | 5 +-
.../pulsar/proxy/server/ProxyIsAHttpProxyTest.java | 30 +++++-----
.../proxy/server/ProxyKeyStoreTlsTestWithAuth.java | 5 +-
.../server/ProxyKeyStoreTlsTestWithoutAuth.java | 5 +-
.../server/ProxyKeyStoreTlsTransportTest.java | 5 +-
.../proxy/server/ProxyLookupThrottlingTest.java | 5 +-
.../pulsar/proxy/server/ProxyMutualTlsTest.java | 5 +-
.../pulsar/proxy/server/ProxyParserTest.java | 68 +++++++++++-----------
.../proxy/server/ProxyPrometheusMetricsTest.java | 5 +-
.../proxy/server/ProxyServiceStarterTest.java | 4 ++
.../proxy/server/ProxyServiceTlsStarterTest.java | 4 ++
.../apache/pulsar/proxy/server/ProxyStatsTest.java | 5 +-
.../proxy/server/ProxyStuckConnectionTest.java | 5 +-
.../org/apache/pulsar/proxy/server/ProxyTest.java | 5 +-
.../apache/pulsar/proxy/server/ProxyTlsTest.java | 5 +-
.../pulsar/proxy/server/ProxyTlsTestWithAuth.java | 5 +-
.../server/ProxyWithJwtAuthorizationTest.java | 8 +--
.../SuperUserAuthedAdminProxyHandlerTest.java | 6 +-
.../server/UnauthedAdminProxyHandlerTest.java | 6 +-
45 files changed, 197 insertions(+), 129 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 9df84b45775..b8d75bd0fbc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -25,10 +25,12 @@ import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -140,6 +142,8 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
protected boolean enableBrokerInterceptor = false;
+ private final List<AutoCloseable> closeables = new ArrayList<>();
+
public MockedPulsarServiceBaseTest() {
resetConfig();
}
@@ -274,6 +278,8 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
pulsarTestContext = null;
}
resetConfig();
+ callCloseables(closeables);
+ closeables.clear();
onCleanup();
}
@@ -291,6 +297,21 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
}
+ protected <T extends AutoCloseable> T registerCloseable(T closeable) {
+ closeables.add(closeable);
+ return closeable;
+ }
+
+ private static void callCloseables(List<AutoCloseable> closeables) {
+ for (int i = closeables.size() - 1; i >= 0; i--) {
+ try {
+ closeables.get(i).close();
+ } catch (Exception e) {
+ log.error("Failure in calling close method", e);
+ }
+ }
+ }
+
protected abstract void setup() throws Exception;
protected abstract void cleanup() throws Exception;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisePublishLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisePublishLimiterTest.java
index 73cb43d52b1..9d5cfe5b5d2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisePublishLimiterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisePublishLimiterTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import lombok.Cleanup;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.testng.annotations.Test;
@@ -27,6 +28,7 @@ public class PrecisePublishLimiterTest {
@Test
void shouldResetMsgLimitAfterUpdate() {
+ @Cleanup
PrecisePublishLimiter precisePublishLimiter = new PrecisePublishLimiter(new PublishRate(), () -> {
});
precisePublishLimiter.update(new PublishRate(1, 1));
@@ -37,6 +39,7 @@ public class PrecisePublishLimiterTest {
@Test
void shouldResetBytesLimitAfterUpdate() {
+ @Cleanup
PrecisePublishLimiter precisePublishLimiter = new PrecisePublishLimiter(new PublishRate(), () -> {
});
precisePublishLimiter.update(new PublishRate(1, 1));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
index ac1f5702187..25ce90e1cf0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
@@ -61,6 +61,7 @@ public class TransactionMetaStoreAssignmentTest extends TransactionTestBase {
pulsarServiceList.remove(crashedMetaStore);
crashedMetaStore.close();
+ pulsarClient.close();
pulsarClient = buildClient();
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
@@ -90,7 +91,7 @@ public class TransactionMetaStoreAssignmentTest extends TransactionTestBase {
.removeTransactionMetadataStore(TransactionCoordinatorID.get(f)));
}
checkTransactionCoordinatorNum(0);
- buildClient();
+ pulsarClient = buildClient();
checkTransactionCoordinatorNum(16);
pulsarClient.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
index ecf1278eab7..c24e1923619 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import com.google.common.collect.Sets;
+import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.file.Path;
@@ -112,21 +113,22 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum
Path path = Paths.get(CREDENTIALS_FILE).toAbsolutePath();
log.info("Credentials File path: {}", path.toString());
- // AuthenticationOAuth2
- Authentication authentication = AuthenticationFactoryOAuth2.clientCredentials(
- new URL(server.getIssuer()),
- path.toUri().toURL(), // key file path
- audience
- );
-
closeAdmin();
admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
- .authentication(authentication)
+ .authentication(createAuthentication(path))
.build());
replacePulsarClient(PulsarClient.builder().serviceUrl(new URI(pulsar.getBrokerServiceUrl()).toString())
.statsInterval(0, TimeUnit.SECONDS)
- .authentication(authentication));
+ .authentication(createAuthentication(path)));
+ }
+
+ private Authentication createAuthentication(Path path) throws MalformedURLException {
+ return AuthenticationFactoryOAuth2.clientCredentials(
+ new URL(server.getIssuer()),
+ path.toUri().toURL(), // key file path
+ audience
+ );
}
@AfterMethod(alwaysRun = true)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java
index 8c93b293c41..8ab94e29cfe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java
@@ -70,8 +70,9 @@ public class HierarchyTopicAutoCreationTest extends ProducerConsumerBase {
Assert.assertEquals(nsAutoTopicCreationOverride, expectedPolicies);
// Background invalidate cache
final MetadataCache<Policies> nsCache = pulsar.getPulsarResources().getNamespaceResources().getCache();
+ @Cleanup("interrupt")
final Thread t1 = new Thread(() -> {
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
nsCache.invalidate("/admin/policies/" + namespace);
}
});
@@ -90,7 +91,5 @@ public class HierarchyTopicAutoCreationTest extends ProducerConsumerBase {
// double-check policies
final AutoTopicCreationOverride actualPolicies2 = admin.namespaces().getAutoTopicCreation(namespace);
Assert.assertEquals(actualPolicies2, expectedPolicies);
-
- t1.interrupt();
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index d7e15f4ce83..9d7493733fe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -527,6 +527,7 @@ public class PulsarFunctionPublishTest {
log.info("dlog url: {}", url);
URI dlogUri = URI.create(url);
+ @Cleanup
Namespace dlogNamespace = NamespaceBuilder.newBuilder()
.conf(dlogConf)
.clientId("function-worker-" + workerConfig.getWorkerId())
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
index e4d7b8349ec..3f34857f59e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
@@ -83,7 +83,7 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase {
}
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
index a3b26a4a9d1..d4f7c72bed0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
@@ -65,7 +65,7 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
config.setWebServicePort(Optional.of(0));
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
service.start();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
index 173948ab1be..85f512e1567 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
@@ -68,7 +68,7 @@ public class ProxyConfigurationTest extends ProducerConsumerBase {
config.setServiceUrl("http://localhost:8080");
config.getProperties().setProperty("brokerClient_lookupTimeoutMs", "100");
WebSocketService service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
service.start();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java
index cf7304615f5..5234ca00578 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java
@@ -74,7 +74,7 @@ public class ProxyEncryptionPublishConsumeTest extends ProducerConsumerBase {
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
config.setCryptoKeyReaderFactoryClassName(CryptoKeyReaderFactoryImpl.class.getName());
service = spy(new WebSocketService(config));
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
index ab5a43b115a..6c9c5deb0c3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
@@ -65,7 +65,7 @@ public class ProxyIdleTimeoutTest extends ProducerConsumerBase {
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
index 8ba92831389..b4ecb84f580 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
@@ -67,7 +67,7 @@ public class ProxyPingTest extends ProducerConsumerBase {
config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
config.setWebSocketPingDurationSeconds(2);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
index 16936d65fc2..d81c39be284 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
@@ -78,7 +78,7 @@ public class ProxyPublishConsumeClientSideEncryptionTest extends ProducerConsume
config.setClusterName("test");
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
service = spy(new WebSocketService(config));
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 8c64e40f927..ad511580343 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -100,7 +100,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
config.setClusterName("test");
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
index 91cd4fab470..dca4964fc98 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
@@ -74,7 +74,7 @@ public class ProxyPublishConsumeTlsTest extends TlsProducerConsumerBase {
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
index 0a432406001..c3e75bcb4f0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
@@ -59,7 +59,7 @@ public class ProxyPublishConsumeWithoutZKTest extends ProducerConsumerBase {
config.setServiceUrl(pulsar.getSafeWebServiceAddress());
config.setServiceUrlTls(pulsar.getWebServiceAddressTls());
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(service)
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
index 01c851290b6..9767be625a0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
@@ -84,7 +84,7 @@ public class V1_ProxyAuthenticationTest extends V1_ProducerConsumerBase {
}
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index a934b8b0784..216d9ea3085 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -388,7 +388,7 @@ public class ProxyService implements Closeable {
}
if (statsExecutor != null) {
- statsExecutor.shutdown();
+ statsExecutor.shutdownNow();
}
if (proxyAdditionalServlets != null) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 74273316413..485befa00ac 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -35,6 +35,7 @@ import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
+import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.util.datetime.FixedDateFormat;
@@ -101,6 +102,7 @@ public class ProxyServiceStarter {
private ProxyService proxyService;
private WebServer server;
+ private WebSocketService webSocketService;
private static boolean metricsInitialized;
public ProxyServiceStarter(String[] args) throws Exception {
@@ -228,7 +230,9 @@ public class ProxyServiceStarter {
metricsInitialized = true;
}
- addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider());
+ AtomicReference<WebSocketService> webSocketServiceRef = new AtomicReference<>();
+ addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider(), webSocketServiceRef);
+ webSocketService = webSocketServiceRef.get();
// start web-service
server.start();
@@ -242,6 +246,9 @@ public class ProxyServiceStarter {
if (server != null) {
server.stop();
}
+ if (webSocketService != null) {
+ webSocketService.close();
+ }
} catch (Exception e) {
log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
} finally {
@@ -250,9 +257,17 @@ public class ProxyServiceStarter {
}
public static void addWebServerHandlers(WebServer server,
- ProxyConfiguration config,
- ProxyService service,
- BrokerDiscoveryProvider discoveryProvider) throws Exception {
+ ProxyConfiguration config,
+ ProxyService service,
+ BrokerDiscoveryProvider discoveryProvider) throws Exception {
+ addWebServerHandlers(server, config, service, discoveryProvider, null);
+ }
+
+ public static void addWebServerHandlers(WebServer server,
+ ProxyConfiguration config,
+ ProxyService service,
+ BrokerDiscoveryProvider discoveryProvider,
+ AtomicReference<WebSocketService> webSocketServiceRef) throws Exception {
// We can make 'status.html' publicly accessible without authentication since
// it does not contain any sensitive data.
server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath(),
@@ -301,6 +316,9 @@ public class ProxyServiceStarter {
serviceConfiguration.setBrokerClientTlsEnabled(config.isTlsEnabledWithBroker());
WebSocketService webSocketService = new WebSocketService(createClusterData(config), serviceConfiguration);
webSocketService.start();
+ if (webSocketServiceRef != null) {
+ webSocketServiceRef.set(webSocketService);
+ }
final WebSocketServlet producerWebSocketServlet = new WebSocketProducerServlet(webSocketService);
server.addServlet(WebSocketProducerServlet.SERVLET_PATH,
new ServletHolder(producerWebSocketServlet));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
index 79662097c3b..fde7c938d0a 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
@@ -143,8 +143,9 @@ public abstract class SimpleProxyExtensionTestBase extends MockedPulsarServiceBa
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
proxyService.start();
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
index 604354e868e..bc2029861f4 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
@@ -102,11 +102,11 @@ public class AdminProxyHandlerKeystoreTLSTest extends MockedPulsarServiceBaseTes
proxyConfig.setBrokerClientAuthenticationParameters(String.format("keyStoreType:%s,keyStorePath:%s,keyStorePassword:%s",
KEYSTORE_TYPE, BROKER_KEYSTORE_FILE_PATH, BROKER_KEYSTORE_PW));
- resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
- new ZKMetadataStore(mockZooKeeperGlobal));
+ resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)),
+ registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
webServer = new WebServer(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig)));
- discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource));
+ discoveryProvider = spy(registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null);
doReturn(report).when(discoveryProvider).nextBroker();
ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
index 100ea64dd2e..d83de9652cf 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
@@ -98,11 +98,11 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
proxyConfig.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
proxyConfig.setAuthenticationProviders(ImmutableSet.of(AuthenticationProviderTls.class.getName()));
- resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
- new ZKMetadataStore(mockZooKeeperGlobal));
+ resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)),
+ registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
webServer = new WebServer(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig)));
- discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource));
+ discoveryProvider = spy(registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null);
doReturn(report).when(discoveryProvider).nextBroker();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
index 17cd3c33e79..34ab22c7fc6 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
@@ -84,8 +84,9 @@ public class ProxyAdditionalServletTest extends MockedPulsarServiceBaseTest {
proxyService = Mockito.spy(new ProxyService(proxyConfig,
new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
Optional<Integer> proxyLogLevel = Optional.of(2);
assertEquals(proxyLogLevel, proxyService.getConfiguration().getProxyLogLevel());
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index 85f44b8171c..1c93cb20c70 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -140,8 +140,9 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
proxyService.start();
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 336f11ae19d..a070d1e84d3 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -61,8 +61,9 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
proxyConfig.setMaxConcurrentInboundConnectionsPerIp(NUM_CONCURRENT_INBOUND_CONNECTION);
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
proxyService.start();
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java
index 3aa71413d54..37fd66cd7da 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java
@@ -41,8 +41,9 @@ public class ProxyDisableZeroCopyTest extends ProxyTest {
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
proxyService.start();
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
index 8b3092c6f51..5704ba55fed 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
@@ -63,8 +63,9 @@ public class ProxyEnableHAProxyProtocolTest extends MockedPulsarServiceBaseTest
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
proxyService.start();
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
index 246dd9f85e3..90e15ede2f4 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
@@ -19,24 +19,20 @@
package org.apache.pulsar.proxy.server;
import static java.nio.charset.StandardCharsets.UTF_8;
-
import java.io.IOException;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.BooleanSupplier;
-
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.Response;
-
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.resources.PulsarResources;
@@ -56,7 +52,6 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.ProcessorUtils;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.logging.LoggingFeature;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -81,8 +76,8 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
// Set number of CPU's to two for unit tests for running in resource constrained env.
ProcessorUtils.setAvailableProcessors(2);
- resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
- new ZKMetadataStore(mockZooKeeperGlobal));
+ resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)),
+ registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
backingServer1 = new Server(0);
backingServer1.setHandler(newHandler("server1"));
backingServer1.start();
@@ -164,6 +159,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
backingServer1.stop();
backingServer2.stop();
+ backingServer3.stop();
client.close();
}
@@ -204,7 +200,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
- new BrokerDiscoveryProvider(proxyConfig, resource));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
webServer.start();
try {
Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -233,7 +229,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
- new BrokerDiscoveryProvider(proxyConfig, resource));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
webServer.start();
try {
Response r1 = client.target(webServer.getServiceUri()).path("/server1/foobar").request().get();
@@ -264,7 +260,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
- new BrokerDiscoveryProvider(proxyConfig, resource));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
}
@@ -283,7 +279,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
- new BrokerDiscoveryProvider(proxyConfig, resource));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
webServer.start();
try {
Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -310,7 +306,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
- new BrokerDiscoveryProvider(proxyConfig, resource));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
webServer.start();
try {
Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -336,7 +332,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
- new BrokerDiscoveryProvider(proxyConfig, resource));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
webServer.start();
try {
Response r = client.target(webServer.getServiceUri()).path("/foo/bar/blah/foobar").request().get();
@@ -366,7 +362,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
WebServer webServerMaxUriLen8k = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen8k, proxyConfig, null,
- new BrokerDiscoveryProvider(proxyConfig, resource));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
webServerMaxUriLen8k.start();
try {
Response r = client.target(webServerMaxUriLen8k.getServiceUri()).path(longUri.toString()).request().get();
@@ -378,7 +374,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
proxyConfig.setHttpMaxRequestHeaderSize(12 * 1024);
WebServer webServerMaxUriLen12k = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen12k, proxyConfig, null,
- new BrokerDiscoveryProvider(proxyConfig, resource));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
webServerMaxUriLen12k.start();
try {
Response r = client.target(webServerMaxUriLen12k.getServiceUri()).path(longUri.toString()).request().get();
@@ -402,7 +398,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
- new BrokerDiscoveryProvider(proxyConfig, resource));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
webServer.start();
try {
Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -434,7 +430,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
- new BrokerDiscoveryProvider(proxyConfig, resource));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
webServer.start();
HttpClient httpClient = new HttpClient();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
index 88e7b269d6e..6a9745f0550 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
@@ -90,8 +90,9 @@ public class ProxyKeyStoreTlsTestWithAuth extends MockedPulsarServiceBaseTest {
proxyService = Mockito.spy(new ProxyService(proxyConfig,
new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
proxyService.start();
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
index 5feef74e3b9..4ceb85a8524 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
@@ -77,8 +77,9 @@ public class ProxyKeyStoreTlsTestWithoutAuth extends MockedPulsarServiceBaseTest
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
proxyService.start();
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
index 5c4e40ed65a..5ee03395b80 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
@@ -89,8 +89,9 @@ public class ProxyKeyStoreTlsTransportTest extends MockedPulsarServiceBaseTest {
proxyService = Mockito.spy(new ProxyService(proxyConfig,
new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
proxyService.start();
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index 1b63aa14dfe..167c3b19646 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -69,8 +69,9 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest {
AuthenticationService authenticationService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
proxyService = Mockito.spy(new ProxyService(proxyConfig, authenticationService));
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
proxyService.start();
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
index ad237c25397..08066f2e5bf 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
@@ -69,8 +69,9 @@ public class ProxyMutualTlsTest extends MockedPulsarServiceBaseTest {
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
proxyService.start();
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
index 82cd702aa7f..0d93185f5e8 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.pulsar.proxy.server;
import static com.google.common.base.Preconditions.checkArgument;
@@ -23,14 +24,10 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertEquals;
-
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
-
import java.util.Optional;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -78,12 +75,13 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest {
proxyConfig.setProxyLogLevel(Optional.ofNullable(2));
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
- PulsarConfigurationLoader.convertFrom(proxyConfig))));
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+ PulsarConfigurationLoader.convertFrom(proxyConfig))));
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
Optional<Integer> proxyLogLevel = Optional.of(2);
- assertEquals( proxyLogLevel, proxyService.getConfiguration().getProxyLogLevel());
+ assertEquals(proxyLogLevel, proxyService.getConfiguration().getProxyLogLevel());
proxyService.start();
}
@@ -100,8 +98,9 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest {
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
.build();
- Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic")
- .create();
+ Producer<byte[]> producer =
+ client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic")
+ .create();
for (int i = 0; i < 10; i++) {
producer.send("test".getBytes());
@@ -114,10 +113,10 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest {
PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
.build();
Producer<byte[]> producer = client.newProducer(Schema.BYTES)
- .topic("persistent://sample/test/local/producer-consumer-topic")
- .enableBatching(false)
- .messageRoutingMode(MessageRoutingMode.SinglePartition)
- .create();
+ .topic("persistent://sample/test/local/producer-consumer-topic")
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
// Create a consumer directly attached to broker
Consumer<byte[]> consumer = pulsarClient.newConsumer()
@@ -149,9 +148,9 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest {
admin.topics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic", 2);
Producer<byte[]> producer = client.newProducer(Schema.BYTES)
- .topic("persistent://sample/test/local/partitioned-topic")
- .enableBatching(false)
- .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+ .topic("persistent://sample/test/local/partitioned-topic")
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
// Create a consumer directly attached to broker
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://sample/test/local/partitioned-topic")
@@ -171,18 +170,18 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest {
public void testRegexSubscription() throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
- .connectionsPerBroker(5).ioThreads(5).build();
+ .connectionsPerBroker(5).ioThreads(5).build();
// create two topics by subscribing to a topic and closing it
try (Consumer<byte[]> ignored = client.newConsumer()
- .topic("persistent://sample/test/local/topic1")
- .subscriptionName("ignored")
- .subscribe()) {
+ .topic("persistent://sample/test/local/topic1")
+ .subscriptionName("ignored")
+ .subscribe()) {
}
try (Consumer<byte[]> ignored = client.newConsumer()
- .topic("persistent://sample/test/local/topic2")
- .subscriptionName("ignored")
- .subscribe()) {
+ .topic("persistent://sample/test/local/topic2")
+ .subscriptionName("ignored")
+ .subscribe()) {
}
String subName = "regex-sub-proxy-parser-test-" + System.currentTimeMillis();
@@ -190,16 +189,16 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest {
String regexSubscriptionPattern = "persistent://sample/test/local/topic.*";
log.info("Regex subscribe to topics {}", regexSubscriptionPattern);
try (Consumer<byte[]> consumer = client.newConsumer()
- .topicsPattern(regexSubscriptionPattern)
- .subscriptionName(subName)
- .subscribe()) {
+ .topicsPattern(regexSubscriptionPattern)
+ .subscriptionName(subName)
+ .subscribe()) {
log.info("Successfully subscribe to topics using regex {}", regexSubscriptionPattern);
final int numMessages = 20;
try (Producer<byte[]> producer = client.newProducer(Schema.BYTES)
- .topic("persistent://sample/test/local/topic1")
- .create()) {
+ .topic("persistent://sample/test/local/topic1")
+ .create()) {
for (int i = 0; i < numMessages; i++) {
producer.send(("message-" + i).getBytes(UTF_8));
}
@@ -219,8 +218,12 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl(proxyService.getServiceUrl());
+ @Cleanup("shutdownNow")
+ EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false,
+ new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon()));
@Cleanup
- PulsarClient client = getClientActiveConsumerChangeNotSupported(conf);
+ PulsarClient client = getClientActiveConsumerChangeNotSupported(conf,
+ eventLoopGroup);
Producer<byte[]> producer = client.newProducer().topic(topic).create();
Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(sub)
@@ -243,10 +246,9 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest {
((PulsarClientImpl) client).getCnxPool().close();
}
- private static PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
+ private static PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf,
+ final EventLoopGroup eventLoopGroup)
throws Exception {
- ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());
- EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory);
ConnectionPool cnxPool = new ConnectionPool(conf, eventLoopGroup, () -> {
return new ClientCnx(conf, eventLoopGroup, ProtocolVersion.v11_VALUE) {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
index 6948996ad46..b692987d17a 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
@@ -74,8 +74,9 @@ public class ProxyPrometheusMetricsTest extends MockedPulsarServiceBaseTest {
proxyService = Mockito.spy(new ProxyService(proxyConfig,
new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
proxyService.start();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index def58be6df3..925e8192e14 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -95,7 +95,9 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
@Test
public void testProduceAndConsumeMessageWithWebsocket() throws Exception {
+ @Cleanup("stop")
HttpClient producerClient = new HttpClient();
+ @Cleanup("stop")
WebSocketClient producerWebSocketClient = new WebSocketClient(producerClient);
producerWebSocketClient.start();
MyWebSocket producerSocket = new MyWebSocket();
@@ -106,7 +108,9 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
produceRequest.setContext("context");
produceRequest.setPayload(Base64.getEncoder().encodeToString("my payload".getBytes()));
+ @Cleanup("stop")
HttpClient consumerClient = new HttpClient();
+ @Cleanup("stop")
WebSocketClient consumerWebSocketClient = new WebSocketClient(consumerClient);
consumerWebSocketClient.start();
MyWebSocket consumerSocket = new MyWebSocket();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
index 6247c2a66e8..b21162577a2 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
@@ -106,7 +106,9 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
@Test
public void testProduceAndConsumeMessageWithWebsocket() throws Exception {
+ @Cleanup("stop")
HttpClient producerClient = new HttpClient();
+ @Cleanup("stop")
WebSocketClient producerWebSocketClient = new WebSocketClient(producerClient);
producerWebSocketClient.start();
MyWebSocket producerSocket = new MyWebSocket();
@@ -117,7 +119,9 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
produceRequest.setContext("context");
produceRequest.setPayload(Base64.getEncoder().encodeToString("my payload".getBytes()));
+ @Cleanup("stop")
HttpClient consumerClient = new HttpClient();
+ @Cleanup("stop")
WebSocketClient consumerWebSocketClient = new WebSocketClient(consumerClient);
consumerWebSocketClient.start();
MyWebSocket consumerSocket = new MyWebSocket();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
index 140af88aae7..a2692f96dcc 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
@@ -77,8 +77,9 @@ public class ProxyStatsTest extends MockedPulsarServiceBaseTest {
proxyService = Mockito.spy(new ProxyService(proxyConfig,
new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
Optional<Integer> proxyLogLevel = Optional.of(2);
assertEquals(proxyLogLevel, proxyService.getConfiguration().getProxyLogLevel());
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
index 97279659af6..79ea7c5d6a3 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
@@ -93,8 +93,9 @@ public class ProxyStuckConnectionTest extends MockedPulsarServiceBaseTest {
return new TestLookupProxyHandler(this, proxyConnection);
}
});
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
proxyService.start();
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index e799e2e948a..51f7afee090 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -97,8 +97,9 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
proxyService.start();
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index 64b0cd6b1a6..a1b27abece4 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -64,8 +64,9 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest {
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
proxyService.start();
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
index 0f1fa74a209..f6dff8fc3ea 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
@@ -76,8 +76,9 @@ public class ProxyTlsTestWithAuth extends MockedPulsarServiceBaseTest {
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
proxyService.start();
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index 37465b21322..f3302b637a1 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -409,8 +409,8 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
@Test
void testGetStatus() throws Exception {
log.info("-- Starting {} test --", methodName);
- final PulsarResources resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
- new ZKMetadataStore(mockZooKeeperGlobal));
+ final PulsarResources resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)),
+ registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
final AuthenticationService authService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
final WebServer webServer = new WebServer(proxyConfig, authService);
@@ -433,8 +433,8 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
void testGetMetrics() throws Exception {
log.info("-- Starting {} test --", methodName);
startProxy();
- PulsarResources resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
- new ZKMetadataStore(mockZooKeeperGlobal));
+ PulsarResources resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)),
+ registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
AuthenticationService authService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
proxyConfig.setAuthenticateMetricsEndpoint(false);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
index d3291c8fb91..a44e2a85efa 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
@@ -93,11 +93,11 @@ public class SuperUserAuthedAdminProxyHandlerTest extends MockedPulsarServiceBas
proxyConfig.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
proxyConfig.setAuthenticationProviders(ImmutableSet.of(AuthenticationProviderTls.class.getName()));
- resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
- new ZKMetadataStore(mockZooKeeperGlobal));
+ resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)),
+ registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
webServer = new WebServer(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig)));
- discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource));
+ discoveryProvider = spy(registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null);
doReturn(report).when(discoveryProvider).nextBroker();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
index 14cd9f41d99..d239815ae81 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
@@ -79,9 +79,9 @@ public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
webServer = new WebServer(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig)));
- resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
- new ZKMetadataStore(mockZooKeeperGlobal));
- discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource));
+ resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)),
+ registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
+ discoveryProvider = spy(registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
adminProxyHandler = new AdminProxyWrapper(proxyConfig, discoveryProvider);
ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
webServer.addServlet("/admin", servletHolder);