You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2023/10/31 09:46:47 UTC

(pulsar) branch master updated: [fix][test] Add MockedPulsarServiceBaseTest.registerCloseable and continue fixing thread leaks (#21477)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 69740c85e11 [fix][test] Add MockedPulsarServiceBaseTest.registerCloseable and continue fixing thread leaks (#21477)
69740c85e11 is described below

commit 69740c85e111318c8f774294e0129b770fff15e6
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Oct 31 11:46:41 2023 +0200

    [fix][test] Add MockedPulsarServiceBaseTest.registerCloseable and continue fixing thread leaks (#21477)
---
 .../broker/auth/MockedPulsarServiceBaseTest.java   | 21 +++++++
 .../broker/service/PrecisePublishLimiterTest.java  |  3 +
 .../TransactionMetaStoreAssignmentTest.java        |  3 +-
 ...kenOauth2AuthenticatedProducerConsumerTest.java | 20 ++++---
 .../impl/HierarchyTopicAutoCreationTest.java       |  5 +-
 .../worker/PulsarFunctionPublishTest.java          |  1 +
 .../websocket/proxy/ProxyAuthenticationTest.java   |  2 +-
 .../websocket/proxy/ProxyAuthorizationTest.java    |  2 +-
 .../websocket/proxy/ProxyConfigurationTest.java    |  2 +-
 .../proxy/ProxyEncryptionPublishConsumeTest.java   |  2 +-
 .../websocket/proxy/ProxyIdleTimeoutTest.java      |  2 +-
 .../pulsar/websocket/proxy/ProxyPingTest.java      |  2 +-
 ...roxyPublishConsumeClientSideEncryptionTest.java |  2 +-
 .../websocket/proxy/ProxyPublishConsumeTest.java   |  2 +-
 .../proxy/ProxyPublishConsumeTlsTest.java          |  2 +-
 .../proxy/ProxyPublishConsumeWithoutZKTest.java    |  2 +-
 .../proxy/v1/V1_ProxyAuthenticationTest.java       |  2 +-
 .../apache/pulsar/proxy/server/ProxyService.java   |  2 +-
 .../pulsar/proxy/server/ProxyServiceStarter.java   | 26 +++++++--
 .../extensions/SimpleProxyExtensionTestBase.java   |  5 +-
 .../server/AdminProxyHandlerKeystoreTLSTest.java   |  6 +-
 .../proxy/server/AuthedAdminProxyHandlerTest.java  |  6 +-
 .../proxy/server/ProxyAdditionalServletTest.java   |  5 +-
 .../ProxyAuthenticatedProducerConsumerTest.java    |  5 +-
 .../server/ProxyConnectionThrottlingTest.java      |  5 +-
 .../proxy/server/ProxyDisableZeroCopyTest.java     |  5 +-
 .../server/ProxyEnableHAProxyProtocolTest.java     |  5 +-
 .../pulsar/proxy/server/ProxyIsAHttpProxyTest.java | 30 +++++-----
 .../proxy/server/ProxyKeyStoreTlsTestWithAuth.java |  5 +-
 .../server/ProxyKeyStoreTlsTestWithoutAuth.java    |  5 +-
 .../server/ProxyKeyStoreTlsTransportTest.java      |  5 +-
 .../proxy/server/ProxyLookupThrottlingTest.java    |  5 +-
 .../pulsar/proxy/server/ProxyMutualTlsTest.java    |  5 +-
 .../pulsar/proxy/server/ProxyParserTest.java       | 68 +++++++++++-----------
 .../proxy/server/ProxyPrometheusMetricsTest.java   |  5 +-
 .../proxy/server/ProxyServiceStarterTest.java      |  4 ++
 .../proxy/server/ProxyServiceTlsStarterTest.java   |  4 ++
 .../apache/pulsar/proxy/server/ProxyStatsTest.java |  5 +-
 .../proxy/server/ProxyStuckConnectionTest.java     |  5 +-
 .../org/apache/pulsar/proxy/server/ProxyTest.java  |  5 +-
 .../apache/pulsar/proxy/server/ProxyTlsTest.java   |  5 +-
 .../pulsar/proxy/server/ProxyTlsTestWithAuth.java  |  5 +-
 .../server/ProxyWithJwtAuthorizationTest.java      |  8 +--
 .../SuperUserAuthedAdminProxyHandlerTest.java      |  6 +-
 .../server/UnauthedAdminProxyHandlerTest.java      |  6 +-
 45 files changed, 197 insertions(+), 129 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 9df84b45775..b8d75bd0fbc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -25,10 +25,12 @@ import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -140,6 +142,8 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
 
     protected boolean enableBrokerInterceptor = false;
 
+    private final List<AutoCloseable> closeables = new ArrayList<>();
+
     public MockedPulsarServiceBaseTest() {
         resetConfig();
     }
@@ -274,6 +278,8 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
             pulsarTestContext = null;
         }
         resetConfig();
+        callCloseables(closeables);
+        closeables.clear();
         onCleanup();
     }
 
@@ -291,6 +297,21 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
 
     }
 
+    protected <T extends AutoCloseable> T registerCloseable(T closeable) {
+        closeables.add(closeable);
+        return closeable;
+    }
+
+    private static void callCloseables(List<AutoCloseable> closeables) {
+        for (int i = closeables.size() - 1; i >= 0; i--) {
+            try {
+                closeables.get(i).close();
+            } catch (Exception e) {
+                log.error("Failure in calling close method", e);
+            }
+        }
+    }
+
     protected abstract void setup() throws Exception;
 
     protected abstract void cleanup() throws Exception;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisePublishLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisePublishLimiterTest.java
index 73cb43d52b1..9d5cfe5b5d2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisePublishLimiterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisePublishLimiterTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
 
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
+import lombok.Cleanup;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.testng.annotations.Test;
 
@@ -27,6 +28,7 @@ public class PrecisePublishLimiterTest {
 
     @Test
     void shouldResetMsgLimitAfterUpdate() {
+        @Cleanup
         PrecisePublishLimiter precisePublishLimiter = new PrecisePublishLimiter(new PublishRate(), () -> {
         });
         precisePublishLimiter.update(new PublishRate(1, 1));
@@ -37,6 +39,7 @@ public class PrecisePublishLimiterTest {
 
     @Test
     void shouldResetBytesLimitAfterUpdate() {
+        @Cleanup
         PrecisePublishLimiter precisePublishLimiter = new PrecisePublishLimiter(new PublishRate(), () -> {
         });
         precisePublishLimiter.update(new PublishRate(1, 1));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
index ac1f5702187..25ce90e1cf0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
@@ -61,6 +61,7 @@ public class TransactionMetaStoreAssignmentTest extends TransactionTestBase {
         pulsarServiceList.remove(crashedMetaStore);
         crashedMetaStore.close();
 
+        pulsarClient.close();
         pulsarClient = buildClient();
         Awaitility.await().atMost(5, TimeUnit.SECONDS)
                 .untilAsserted(() -> {
@@ -90,7 +91,7 @@ public class TransactionMetaStoreAssignmentTest extends TransactionTestBase {
                     .removeTransactionMetadataStore(TransactionCoordinatorID.get(f)));
         }
         checkTransactionCoordinatorNum(0);
-        buildClient();
+        pulsarClient = buildClient();
         checkTransactionCoordinatorNum(16);
 
         pulsarClient.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
index ecf1278eab7..c24e1923619 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import com.google.common.collect.Sets;
+import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
 import java.nio.file.Path;
@@ -112,21 +113,22 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum
         Path path = Paths.get(CREDENTIALS_FILE).toAbsolutePath();
         log.info("Credentials File path: {}", path.toString());
 
-        // AuthenticationOAuth2
-        Authentication authentication = AuthenticationFactoryOAuth2.clientCredentials(
-                new URL(server.getIssuer()),
-                path.toUri().toURL(),  // key file path
-                audience
-        );
-
         closeAdmin();
         admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
-                .authentication(authentication)
+                .authentication(createAuthentication(path))
                 .build());
 
         replacePulsarClient(PulsarClient.builder().serviceUrl(new URI(pulsar.getBrokerServiceUrl()).toString())
                 .statsInterval(0, TimeUnit.SECONDS)
-                .authentication(authentication));
+                .authentication(createAuthentication(path)));
+    }
+
+    private Authentication createAuthentication(Path path) throws MalformedURLException {
+        return AuthenticationFactoryOAuth2.clientCredentials(
+                new URL(server.getIssuer()),
+                path.toUri().toURL(),  // key file path
+                audience
+        );
     }
 
     @AfterMethod(alwaysRun = true)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java
index 8c93b293c41..8ab94e29cfe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java
@@ -70,8 +70,9 @@ public class HierarchyTopicAutoCreationTest extends ProducerConsumerBase {
         Assert.assertEquals(nsAutoTopicCreationOverride, expectedPolicies);
         // Background invalidate cache
         final MetadataCache<Policies> nsCache = pulsar.getPulsarResources().getNamespaceResources().getCache();
+        @Cleanup("interrupt")
         final Thread t1 = new Thread(() -> {
-            while (true) {
+            while (!Thread.currentThread().isInterrupted()) {
                 nsCache.invalidate("/admin/policies/" + namespace);
             }
         });
@@ -90,7 +91,5 @@ public class HierarchyTopicAutoCreationTest extends ProducerConsumerBase {
         // double-check policies
         final AutoTopicCreationOverride actualPolicies2 = admin.namespaces().getAutoTopicCreation(namespace);
         Assert.assertEquals(actualPolicies2, expectedPolicies);
-
-        t1.interrupt();
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index d7e15f4ce83..9d7493733fe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -527,6 +527,7 @@ public class PulsarFunctionPublishTest {
         log.info("dlog url: {}", url);
         URI dlogUri = URI.create(url);
 
+        @Cleanup
         Namespace dlogNamespace = NamespaceBuilder.newBuilder()
                 .conf(dlogConf)
                 .clientId("function-worker-" + workerConfig.getWorkerId())
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
index e4d7b8349ec..3f34857f59e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
@@ -83,7 +83,7 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase {
         }
 
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
                 .createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
index a3b26a4a9d1..d4f7c72bed0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
@@ -65,7 +65,7 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
         config.setWebServicePort(Optional.of(0));
         config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
                 .createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
         service.start();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
index 173948ab1be..85f512e1567 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
@@ -68,7 +68,7 @@ public class ProxyConfigurationTest extends ProducerConsumerBase {
         config.setServiceUrl("http://localhost:8080");
         config.getProperties().setProperty("brokerClient_lookupTimeoutMs", "100");
         WebSocketService service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
                 .createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
         service.start();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java
index cf7304615f5..5234ca00578 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java
@@ -74,7 +74,7 @@ public class ProxyEncryptionPublishConsumeTest extends ProducerConsumerBase {
         config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         config.setCryptoKeyReaderFactoryClassName(CryptoKeyReaderFactoryImpl.class.getName());
         service = spy(new WebSocketService(config));
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
                 .createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
index ab5a43b115a..6c9c5deb0c3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
@@ -65,7 +65,7 @@ public class ProxyIdleTimeoutTest extends ProducerConsumerBase {
         config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
                 .createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
index 8ba92831389..b4ecb84f580 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
@@ -67,7 +67,7 @@ public class ProxyPingTest extends ProducerConsumerBase {
         config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
         config.setWebSocketPingDurationSeconds(2);
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
                 .createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
index 16936d65fc2..d81c39be284 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
@@ -78,7 +78,7 @@ public class ProxyPublishConsumeClientSideEncryptionTest extends ProducerConsume
         config.setClusterName("test");
         config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         service = spy(new WebSocketService(config));
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
                 .createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 8c64e40f927..ad511580343 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -100,7 +100,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
         config.setClusterName("test");
         config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
                 .createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
index 91cd4fab470..dca4964fc98 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
@@ -74,7 +74,7 @@ public class ProxyPublishConsumeTlsTest extends TlsProducerConsumerBase {
         config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
         config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
                 .createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
index 0a432406001..c3e75bcb4f0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
@@ -59,7 +59,7 @@ public class ProxyPublishConsumeWithoutZKTest extends ProducerConsumerBase {
         config.setServiceUrl(pulsar.getSafeWebServiceAddress());
         config.setServiceUrlTls(pulsar.getWebServiceAddressTls());
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(service)
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(service)
                 .createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
index 01c851290b6..9767be625a0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
@@ -84,7 +84,7 @@ public class V1_ProxyAuthenticationTest extends V1_ProducerConsumerBase {
         }
 
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service)
                 .createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index a934b8b0784..216d9ea3085 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -388,7 +388,7 @@ public class ProxyService implements Closeable {
         }
 
         if (statsExecutor != null) {
-            statsExecutor.shutdown();
+            statsExecutor.shutdownNow();
         }
 
         if (proxyAdditionalServlets != null) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 74273316413..485befa00ac 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -35,6 +35,7 @@ import java.text.SimpleDateFormat;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.Getter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.core.util.datetime.FixedDateFormat;
@@ -101,6 +102,7 @@ public class ProxyServiceStarter {
     private ProxyService proxyService;
 
     private WebServer server;
+    private WebSocketService webSocketService;
     private static boolean metricsInitialized;
 
     public ProxyServiceStarter(String[] args) throws Exception {
@@ -228,7 +230,9 @@ public class ProxyServiceStarter {
             metricsInitialized = true;
         }
 
-        addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider());
+        AtomicReference<WebSocketService> webSocketServiceRef = new AtomicReference<>();
+        addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider(), webSocketServiceRef);
+        webSocketService = webSocketServiceRef.get();
 
         // start web-service
         server.start();
@@ -242,6 +246,9 @@ public class ProxyServiceStarter {
             if (server != null) {
                 server.stop();
             }
+            if (webSocketService != null) {
+                webSocketService.close();
+            }
         } catch (Exception e) {
             log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
         } finally {
@@ -250,9 +257,17 @@ public class ProxyServiceStarter {
     }
 
     public static void addWebServerHandlers(WebServer server,
-                                     ProxyConfiguration config,
-                                     ProxyService service,
-                                     BrokerDiscoveryProvider discoveryProvider) throws Exception {
+                                            ProxyConfiguration config,
+                                            ProxyService service,
+                                            BrokerDiscoveryProvider discoveryProvider) throws Exception {
+        addWebServerHandlers(server, config, service, discoveryProvider, null);
+    }
+
+    public static void addWebServerHandlers(WebServer server,
+                                            ProxyConfiguration config,
+                                            ProxyService service,
+                                            BrokerDiscoveryProvider discoveryProvider,
+                                            AtomicReference<WebSocketService> webSocketServiceRef) throws Exception {
         // We can make 'status.html' publicly accessible without authentication since
         // it does not contain any sensitive data.
         server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath(),
@@ -301,6 +316,9 @@ public class ProxyServiceStarter {
             serviceConfiguration.setBrokerClientTlsEnabled(config.isTlsEnabledWithBroker());
             WebSocketService webSocketService = new WebSocketService(createClusterData(config), serviceConfiguration);
             webSocketService.start();
+            if (webSocketServiceRef != null) {
+                webSocketServiceRef.set(webSocketService);
+            }
             final WebSocketServlet producerWebSocketServlet = new WebSocketProducerServlet(webSocketService);
             server.addServlet(WebSocketProducerServlet.SERVLET_PATH,
                     new ServletHolder(producerWebSocketServlet));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
index 79662097c3b..fde7c938d0a 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
@@ -143,8 +143,9 @@ public abstract class SimpleProxyExtensionTestBase extends MockedPulsarServiceBa
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig))));
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
 
         proxyService.start();
     }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
index 604354e868e..bc2029861f4 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
@@ -102,11 +102,11 @@ public class AdminProxyHandlerKeystoreTLSTest extends MockedPulsarServiceBaseTes
         proxyConfig.setBrokerClientAuthenticationParameters(String.format("keyStoreType:%s,keyStorePath:%s,keyStorePassword:%s",
                 KEYSTORE_TYPE, BROKER_KEYSTORE_FILE_PATH, BROKER_KEYSTORE_PW));
 
-        resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
-                new ZKMetadataStore(mockZooKeeperGlobal));
+        resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)),
+                registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
         webServer = new WebServer(proxyConfig, new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig)));
-        discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource));
+        discoveryProvider = spy(registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
         LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null);
         doReturn(report).when(discoveryProvider).nextBroker();
         ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
index 100ea64dd2e..d83de9652cf 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
@@ -98,11 +98,11 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
         proxyConfig.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
         proxyConfig.setAuthenticationProviders(ImmutableSet.of(AuthenticationProviderTls.class.getName()));
 
-        resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
-                new ZKMetadataStore(mockZooKeeperGlobal));
+        resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)),
+                registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
         webServer = new WebServer(proxyConfig, new AuthenticationService(
                                           PulsarConfigurationLoader.convertFrom(proxyConfig)));
-        discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource));
+        discoveryProvider = spy(registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
         LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null);
         doReturn(report).when(discoveryProvider).nextBroker();
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
index 17cd3c33e79..34ab22c7fc6 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
@@ -84,8 +84,9 @@ public class ProxyAdditionalServletTest extends MockedPulsarServiceBaseTest {
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig,
                 new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
 
         Optional<Integer> proxyLogLevel = Optional.of(2);
         assertEquals(proxyLogLevel, proxyService.getConfiguration().getProxyLogLevel());
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index 85f44b8171c..1c93cb20c70 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -140,8 +140,9 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
                                                             PulsarConfigurationLoader.convertFrom(proxyConfig))));
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
         proxyService.start();
     }
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 336f11ae19d..a070d1e84d3 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -61,8 +61,9 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
         proxyConfig.setMaxConcurrentInboundConnectionsPerIp(NUM_CONCURRENT_INBOUND_CONNECTION);
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig))));
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
 
         proxyService.start();
     }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java
index 3aa71413d54..37fd66cd7da 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java
@@ -41,8 +41,9 @@ public class ProxyDisableZeroCopyTest extends ProxyTest {
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig))));
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
 
         proxyService.start();
     }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
index 8b3092c6f51..5704ba55fed 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
@@ -63,8 +63,9 @@ public class ProxyEnableHAProxyProtocolTest extends MockedPulsarServiceBaseTest
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig))));
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
 
         proxyService.start();
     }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
index 246dd9f85e3..90e15ede2f4 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
@@ -19,24 +19,20 @@
 package org.apache.pulsar.proxy.server;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-
 import java.io.IOException;
 import java.util.Properties;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.BooleanSupplier;
-
 import javax.servlet.AsyncContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.core.Response;
-
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.resources.PulsarResources;
@@ -56,7 +52,6 @@ import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.util.ProcessorUtils;
 import org.glassfish.jersey.client.ClientConfig;
 import org.glassfish.jersey.logging.LoggingFeature;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -81,8 +76,8 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
         // Set number of CPU's to two for unit tests for running in resource constrained env.
         ProcessorUtils.setAvailableProcessors(2);
 
-        resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
-                new ZKMetadataStore(mockZooKeeperGlobal));
+        resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)),
+                registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
         backingServer1 = new Server(0);
         backingServer1.setHandler(newHandler("server1"));
         backingServer1.start();
@@ -164,6 +159,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
 
         backingServer1.stop();
         backingServer2.stop();
+        backingServer3.stop();
         client.close();
     }
 
@@ -204,7 +200,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
         webServer.start();
         try {
             Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -233,7 +229,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
         webServer.start();
         try {
             Response r1 = client.target(webServer.getServiceUri()).path("/server1/foobar").request().get();
@@ -264,7 +260,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
 
     }
 
@@ -283,7 +279,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
         webServer.start();
         try {
             Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -310,7 +306,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
         webServer.start();
         try {
             Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -336,7 +332,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
         webServer.start();
         try {
             Response r = client.target(webServer.getServiceUri()).path("/foo/bar/blah/foobar").request().get();
@@ -366,7 +362,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
 
         WebServer webServerMaxUriLen8k = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen8k, proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
         webServerMaxUriLen8k.start();
         try {
             Response r = client.target(webServerMaxUriLen8k.getServiceUri()).path(longUri.toString()).request().get();
@@ -378,7 +374,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
         proxyConfig.setHttpMaxRequestHeaderSize(12 * 1024);
         WebServer webServerMaxUriLen12k = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen12k, proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
         webServerMaxUriLen12k.start();
         try {
             Response r = client.target(webServerMaxUriLen12k.getServiceUri()).path(longUri.toString()).request().get();
@@ -402,7 +398,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
         webServer.start();
         try {
             Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -434,7 +430,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest {
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
         webServer.start();
 
         HttpClient httpClient = new HttpClient();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
index 88e7b269d6e..6a9745f0550 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
@@ -90,8 +90,9 @@ public class ProxyKeyStoreTlsTestWithAuth extends MockedPulsarServiceBaseTest {
         proxyService = Mockito.spy(new ProxyService(proxyConfig,
                                                     new AuthenticationService(
                                                             PulsarConfigurationLoader.convertFrom(proxyConfig))));
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
 
         proxyService.start();
     }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
index 5feef74e3b9..4ceb85a8524 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
@@ -77,8 +77,9 @@ public class ProxyKeyStoreTlsTestWithoutAuth extends MockedPulsarServiceBaseTest
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
                                                             PulsarConfigurationLoader.convertFrom(proxyConfig))));
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
 
         proxyService.start();
     }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
index 5c4e40ed65a..5ee03395b80 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
@@ -89,8 +89,9 @@ public class ProxyKeyStoreTlsTransportTest extends MockedPulsarServiceBaseTest {
         proxyService = Mockito.spy(new ProxyService(proxyConfig,
                                                     new AuthenticationService(
                                                             PulsarConfigurationLoader.convertFrom(proxyConfig))));
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
 
         proxyService.start();
     }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index 1b63aa14dfe..167c3b19646 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -69,8 +69,9 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest {
         AuthenticationService authenticationService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
         proxyService = Mockito.spy(new ProxyService(proxyConfig, authenticationService));
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
 
         proxyService.start();
     }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
index ad237c25397..08066f2e5bf 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
@@ -69,8 +69,9 @@ public class ProxyMutualTlsTest extends MockedPulsarServiceBaseTest {
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
                                                             PulsarConfigurationLoader.convertFrom(proxyConfig))));
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
 
         proxyService.start();
     }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
index 82cd702aa7f..0d93185f5e8 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -23,14 +24,10 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Objects.requireNonNull;
 import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertEquals;
-
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
-
 import java.util.Optional;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-
 import lombok.Cleanup;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -78,12 +75,13 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest {
         proxyConfig.setProxyLogLevel(Optional.ofNullable(2));
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
-                                                            PulsarConfigurationLoader.convertFrom(proxyConfig))));
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+                PulsarConfigurationLoader.convertFrom(proxyConfig))));
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
 
         Optional<Integer> proxyLogLevel = Optional.of(2);
-        assertEquals( proxyLogLevel, proxyService.getConfiguration().getProxyLogLevel());
+        assertEquals(proxyLogLevel, proxyService.getConfiguration().getProxyLogLevel());
         proxyService.start();
     }
 
@@ -100,8 +98,9 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest {
         @Cleanup
         PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
                 .build();
-        Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic")
-                .create();
+        Producer<byte[]> producer =
+                client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic")
+                        .create();
 
         for (int i = 0; i < 10; i++) {
             producer.send("test".getBytes());
@@ -114,10 +113,10 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest {
         PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
                 .build();
         Producer<byte[]> producer = client.newProducer(Schema.BYTES)
-            .topic("persistent://sample/test/local/producer-consumer-topic")
-            .enableBatching(false)
-            .messageRoutingMode(MessageRoutingMode.SinglePartition)
-            .create();
+                .topic("persistent://sample/test/local/producer-consumer-topic")
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
 
         // Create a consumer directly attached to broker
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
@@ -149,9 +148,9 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest {
         admin.topics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic", 2);
 
         Producer<byte[]> producer = client.newProducer(Schema.BYTES)
-            .topic("persistent://sample/test/local/partitioned-topic")
-            .enableBatching(false)
-            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+                .topic("persistent://sample/test/local/partitioned-topic")
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
         // Create a consumer directly attached to broker
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://sample/test/local/partitioned-topic")
@@ -171,18 +170,18 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest {
     public void testRegexSubscription() throws Exception {
         @Cleanup
         PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
-            .connectionsPerBroker(5).ioThreads(5).build();
+                .connectionsPerBroker(5).ioThreads(5).build();
 
         // create two topics by subscribing to a topic and closing it
         try (Consumer<byte[]> ignored = client.newConsumer()
-            .topic("persistent://sample/test/local/topic1")
-            .subscriptionName("ignored")
-            .subscribe()) {
+                .topic("persistent://sample/test/local/topic1")
+                .subscriptionName("ignored")
+                .subscribe()) {
         }
         try (Consumer<byte[]> ignored = client.newConsumer()
-            .topic("persistent://sample/test/local/topic2")
-            .subscriptionName("ignored")
-            .subscribe()) {
+                .topic("persistent://sample/test/local/topic2")
+                .subscriptionName("ignored")
+                .subscribe()) {
         }
         String subName = "regex-sub-proxy-parser-test-" + System.currentTimeMillis();
 
@@ -190,16 +189,16 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest {
         String regexSubscriptionPattern = "persistent://sample/test/local/topic.*";
         log.info("Regex subscribe to topics {}", regexSubscriptionPattern);
         try (Consumer<byte[]> consumer = client.newConsumer()
-            .topicsPattern(regexSubscriptionPattern)
-            .subscriptionName(subName)
-            .subscribe()) {
+                .topicsPattern(regexSubscriptionPattern)
+                .subscriptionName(subName)
+                .subscribe()) {
             log.info("Successfully subscribe to topics using regex {}", regexSubscriptionPattern);
 
             final int numMessages = 20;
 
             try (Producer<byte[]> producer = client.newProducer(Schema.BYTES)
-                .topic("persistent://sample/test/local/topic1")
-                .create()) {
+                    .topic("persistent://sample/test/local/topic1")
+                    .create()) {
                 for (int i = 0; i < numMessages; i++) {
                     producer.send(("message-" + i).getBytes(UTF_8));
                 }
@@ -219,8 +218,12 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest {
 
         ClientConfigurationData conf = new ClientConfigurationData();
         conf.setServiceUrl(proxyService.getServiceUrl());
+        @Cleanup("shutdownNow")
+        EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false,
+                new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon()));
         @Cleanup
-        PulsarClient client = getClientActiveConsumerChangeNotSupported(conf);
+        PulsarClient client = getClientActiveConsumerChangeNotSupported(conf,
+                eventLoopGroup);
 
         Producer<byte[]> producer = client.newProducer().topic(topic).create();
         Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(sub)
@@ -243,10 +246,9 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest {
         ((PulsarClientImpl) client).getCnxPool().close();
     }
 
-    private static PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
+    private static PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf,
+                                                                          final EventLoopGroup eventLoopGroup)
             throws Exception {
-        ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());
-        EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory);
 
         ConnectionPool cnxPool = new ConnectionPool(conf, eventLoopGroup, () -> {
             return new ClientCnx(conf, eventLoopGroup, ProtocolVersion.v11_VALUE) {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
index 6948996ad46..b692987d17a 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
@@ -74,8 +74,9 @@ public class ProxyPrometheusMetricsTest extends MockedPulsarServiceBaseTest {
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig,
                 new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
 
         proxyService.start();
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index def58be6df3..925e8192e14 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -95,7 +95,9 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void testProduceAndConsumeMessageWithWebsocket() throws Exception {
+        @Cleanup("stop")
         HttpClient producerClient = new HttpClient();
+        @Cleanup("stop")
         WebSocketClient producerWebSocketClient = new WebSocketClient(producerClient);
         producerWebSocketClient.start();
         MyWebSocket producerSocket = new MyWebSocket();
@@ -106,7 +108,9 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
         produceRequest.setContext("context");
         produceRequest.setPayload(Base64.getEncoder().encodeToString("my payload".getBytes()));
 
+        @Cleanup("stop")
         HttpClient consumerClient = new HttpClient();
+        @Cleanup("stop")
         WebSocketClient consumerWebSocketClient = new WebSocketClient(consumerClient);
         consumerWebSocketClient.start();
         MyWebSocket consumerSocket = new MyWebSocket();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
index 6247c2a66e8..b21162577a2 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
@@ -106,7 +106,9 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void testProduceAndConsumeMessageWithWebsocket() throws Exception {
+        @Cleanup("stop")
         HttpClient producerClient = new HttpClient();
+        @Cleanup("stop")
         WebSocketClient producerWebSocketClient = new WebSocketClient(producerClient);
         producerWebSocketClient.start();
         MyWebSocket producerSocket = new MyWebSocket();
@@ -117,7 +119,9 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
         produceRequest.setContext("context");
         produceRequest.setPayload(Base64.getEncoder().encodeToString("my payload".getBytes()));
 
+        @Cleanup("stop")
         HttpClient consumerClient = new HttpClient();
+        @Cleanup("stop")
         WebSocketClient consumerWebSocketClient = new WebSocketClient(consumerClient);
         consumerWebSocketClient.start();
         MyWebSocket consumerSocket = new MyWebSocket();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
index 140af88aae7..a2692f96dcc 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
@@ -77,8 +77,9 @@ public class ProxyStatsTest extends MockedPulsarServiceBaseTest {
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig,
                 new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
 
         Optional<Integer> proxyLogLevel = Optional.of(2);
         assertEquals(proxyLogLevel, proxyService.getConfiguration().getProxyLogLevel());
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
index 97279659af6..79ea7c5d6a3 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
@@ -93,8 +93,9 @@ public class ProxyStuckConnectionTest extends MockedPulsarServiceBaseTest {
                 return new TestLookupProxyHandler(this, proxyConnection);
             }
         });
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
         proxyService.start();
     }
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index e799e2e948a..51f7afee090 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -97,8 +97,9 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
                                                             PulsarConfigurationLoader.convertFrom(proxyConfig))));
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
 
         proxyService.start();
     }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index 64b0cd6b1a6..a1b27abece4 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -64,8 +64,9 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest {
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
                                                             PulsarConfigurationLoader.convertFrom(proxyConfig))));
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
 
         proxyService.start();
     }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
index 0f1fa74a209..f6dff8fc3ea 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
@@ -76,8 +76,9 @@ public class ProxyTlsTestWithAuth extends MockedPulsarServiceBaseTest {
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
             PulsarConfigurationLoader.convertFrom(proxyConfig))));
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
 
         proxyService.start();
     }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index 37465b21322..f3302b637a1 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -409,8 +409,8 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
     @Test
     void testGetStatus() throws Exception {
         log.info("-- Starting {} test --", methodName);
-        final PulsarResources resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
-                new ZKMetadataStore(mockZooKeeperGlobal));
+        final PulsarResources resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)),
+                registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
         final AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
         final WebServer webServer = new WebServer(proxyConfig, authService);
@@ -433,8 +433,8 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
     void testGetMetrics() throws Exception {
         log.info("-- Starting {} test --", methodName);
         startProxy();
-        PulsarResources resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
-                new ZKMetadataStore(mockZooKeeperGlobal));
+        PulsarResources resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)),
+                registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
         AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
         proxyConfig.setAuthenticateMetricsEndpoint(false);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
index d3291c8fb91..a44e2a85efa 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
@@ -93,11 +93,11 @@ public class SuperUserAuthedAdminProxyHandlerTest extends MockedPulsarServiceBas
         proxyConfig.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
         proxyConfig.setAuthenticationProviders(ImmutableSet.of(AuthenticationProviderTls.class.getName()));
 
-        resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
-                new ZKMetadataStore(mockZooKeeperGlobal));
+        resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)),
+                registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
         webServer = new WebServer(proxyConfig, new AuthenticationService(
                                           PulsarConfigurationLoader.convertFrom(proxyConfig)));
-        discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource));
+        discoveryProvider = spy(registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
         LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null);
         doReturn(report).when(discoveryProvider).nextBroker();
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
index 14cd9f41d99..d239815ae81 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
@@ -79,9 +79,9 @@ public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
         webServer = new WebServer(proxyConfig, new AuthenticationService(
                                           PulsarConfigurationLoader.convertFrom(proxyConfig)));
 
-        resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
-                new ZKMetadataStore(mockZooKeeperGlobal));
-        discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource));
+        resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)),
+                registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
+        discoveryProvider = spy(registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
         adminProxyHandler = new AdminProxyWrapper(proxyConfig, discoveryProvider);
         ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
         webServer.addServlet("/admin", servletHolder);