You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/10/17 01:44:19 UTC

[pulsar] branch master updated: [fix][test] AdvertisedListenersTest.setup (#17869)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b530184464c [fix][test] AdvertisedListenersTest.setup (#17869)
b530184464c is described below

commit b530184464ce0afe8b002c14ca62fbd96ea4dfcf
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Mon Oct 17 09:44:11 2022 +0800

    [fix][test] AdvertisedListenersTest.setup (#17869)
---
 .../mledger/impl/ManagedLedgerBkTest.java          |  4 +-
 .../bookkeeper/test/BookKeeperClusterTestCase.java |  9 ++-
 .../apache/pulsar/broker/MultiBrokerBaseTest.java  |  5 ++
 .../loadbalance/AdvertisedListenersTest.java       |  8 +--
 .../protocol/SimpleProtocolHandlerTestsBase.java   | 13 +++-
 .../functions/worker/PulsarFunctionTlsTest.java    | 11 +++-
 .../org/apache/pulsar/common/util/PortManager.java | 69 ++++++++++++++++++++++
 .../apache/pulsar/common/util/PortManagerTest.java | 37 ++++++++++++
 .../bookkeeper/test/BookKeeperClusterTestCase.java |  8 ++-
 .../extensions/SimpleProxyExtensionTestBase.java   | 13 +++-
 10 files changed, 158 insertions(+), 19 deletions(-)

diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
index 3ad521eeb79..5c42e7f46bd 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
@@ -117,7 +118,7 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
         metadataStore.unsetAlwaysFail();
 
         bkc = new BookKeeperTestClient(baseClientConf);
-        startNewBookie();
+        int port = startNewBookie();
 
         // Reconnect a new bk client
         factory.shutdown();
@@ -147,6 +148,7 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
         assertEquals("entry-2", new String(entries.get(0).getData()));
         entries.forEach(Entry::release);
         factory.shutdown();
+        releaseLockedPort(port);
     }
 
     @Test
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index e4f1d470bf6..5518838be37 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -24,6 +24,7 @@
 package org.apache.bookkeeper.test;
 
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
+import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
 import static org.testng.Assert.assertFalse;
 
 import com.google.common.base.Stopwatch;
@@ -62,7 +63,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.replication.Auditor;
 import org.apache.bookkeeper.replication.ReplicationWorker;
-import org.apache.bookkeeper.util.PortManager;
+import org.apache.pulsar.common.util.PortManager;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
@@ -113,6 +114,7 @@ public abstract class BookKeeperClusterTestCase {
 
     private boolean isAutoRecoveryEnabled;
     protected ExecutorService executor;
+    private final List<Integer> bookiePorts = new ArrayList<>();
 
     SynchronousQueue<Throwable> asyncExceptions = new SynchronousQueue<>();
     protected void captureThrowable(Runnable c) {
@@ -264,7 +266,7 @@ public abstract class BookKeeperClusterTestCase {
 
         // Create Bookie Servers (B1, B2, B3)
         for (int i = 0; i < numBookies; i++) {
-            startNewBookie();
+            bookiePorts.add(startNewBookie());
         }
     }
 
@@ -283,6 +285,7 @@ public abstract class BookKeeperClusterTestCase {
             t.shutdown();
         }
         servers.clear();
+        bookiePorts.removeIf(PortManager::releaseLockedPort);
     }
 
     protected ServerConfiguration newServerConfiguration() throws Exception {
@@ -290,7 +293,7 @@ public abstract class BookKeeperClusterTestCase {
 
         int port;
         if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) {
-            port = PortManager.nextFreePort();
+            port = nextLockedFreePort();
         } else {
             port = 0;
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
index c00ae8cd0d3..5b78a32dc37 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
@@ -21,12 +21,14 @@ package org.apache.pulsar.broker;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.util.PortManager;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -124,6 +126,9 @@ public abstract class MultiBrokerBaseTest extends MockedPulsarServiceBaseTest {
                 try {
                     pulsarService.getConfiguration().setBrokerShutdownTimeoutMs(0L);
                     pulsarService.close();
+                    pulsarService.getConfiguration().getBrokerServicePort().ifPresent(PortManager::releaseLockedPort);
+                    pulsarService.getConfiguration().getWebServicePort().ifPresent(PortManager::releaseLockedPort);
+                    pulsarService.getConfiguration().getWebServicePortTls().ifPresent(PortManager::releaseLockedPort);
                 } catch (PulsarServerException e) {
                     // ignore
                 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
index 489efa5755b..9ca4510a209 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.loadbalance;
 
+import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 
@@ -25,7 +26,6 @@ import java.net.URI;
 import java.util.Optional;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.util.PortManager;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHeaders;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -66,9 +66,9 @@ public class AdvertisedListenersTest extends MultiBrokerBaseTest {
     }
 
     private void updateConfig(ServiceConfiguration conf, String advertisedAddress) {
-        int pulsarPort = PortManager.nextFreePort();
-        int httpPort = PortManager.nextFreePort();
-        int httpsPort = PortManager.nextFreePort();
+        int pulsarPort = nextLockedFreePort();
+        int httpPort = nextLockedFreePort();
+        int httpsPort = nextLockedFreePort();
 
         // Use invalid domain name as identifier and instead make sure the advertised listeners work as intended
         conf.setAdvertisedAddress(advertisedAddress);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
index 639ccf7ecd0..f08771934f6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
@@ -22,12 +22,12 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.*;
 import io.netty.channel.socket.SocketChannel;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.util.PortManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.common.util.PortManager;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -39,11 +39,14 @@ import java.net.Socket;
 import java.net.SocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 
+import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
 import static org.testng.Assert.assertEquals;
 
 @Slf4j
@@ -54,6 +57,8 @@ public abstract class SimpleProtocolHandlerTestsBase extends BrokerTestBase {
 
         private ServiceConfiguration conf;
 
+        private final List<Integer> ports = new ArrayList<>();
+
         @Override
         public String protocolName() {
             return "test";
@@ -81,7 +86,9 @@ public abstract class SimpleProtocolHandlerTestsBase extends BrokerTestBase {
 
         @Override
         public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
-            return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), PortManager.nextFreePort()),
+            int port = nextLockedFreePort();
+            this.ports.add(port);
+            return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), port),
                     new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel socketChannel) throws Exception {
@@ -106,7 +113,7 @@ public abstract class SimpleProtocolHandlerTestsBase extends BrokerTestBase {
 
         @Override
         public void close() {
-
+            ports.removeIf(PortManager::releaseLockedPort);
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 3aaf914b9eb..bfc9928a7de 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
 import static org.testng.Assert.assertEquals;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -33,7 +34,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.util.PortManager;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
@@ -44,6 +44,7 @@ import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.PortManager;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
@@ -86,8 +87,8 @@ public class PulsarFunctionTlsTest {
 
         // start brokers
         for (int i = 0; i < BROKER_COUNT; i++) {
-            int brokerPort = PortManager.nextFreePort();
-            int webPort = PortManager.nextFreePort();
+            int brokerPort = nextLockedFreePort();
+            int webPort = nextLockedFreePort();
 
             ServiceConfiguration config = new ServiceConfiguration();
             config.setBrokerShutdownTimeoutMs(0L);
@@ -196,6 +197,10 @@ public class PulsarFunctionTlsTest {
             for (int i = 0; i < BROKER_COUNT; i++) {
                 if (pulsarServices[i] != null) {
                     pulsarServices[i].close();
+                    pulsarServices[i].getConfiguration().
+                            getBrokerServicePort().ifPresent(PortManager::releaseLockedPort);
+                    pulsarServices[i].getConfiguration()
+                            .getWebServicePort().ifPresent(PortManager::releaseLockedPort);
                 }
             }
             bkEnsemble.stop();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java
new file mode 100644
index 00000000000..b9df071fdbc
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import java.net.ServerSocket;
+import java.util.HashSet;
+import java.util.Set;
+
+public class PortManager {
+
+    private static final Set<Integer> PORTS = new HashSet<>();
+
+    /**
+     * Return a locked available port.
+     *
+     * @return locked available port.
+     */
+    public static synchronized int nextLockedFreePort() {
+        int exceptionCount = 0;
+        while (true) {
+            try (ServerSocket ss = new ServerSocket(0)) {
+                int port = ss.getLocalPort();
+                if (!checkPortIfLocked(port)) {
+                    PORTS.add(port);
+                    return port;
+                }
+            } catch (Exception e) {
+                exceptionCount++;
+                if (exceptionCount > 100) {
+                    throw new RuntimeException("Unable to allocate socket port", e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Returns whether the port was released successfully.
+     *
+     * @return whether the release is successful.
+     */
+    public static synchronized boolean releaseLockedPort(int lockedPort) {
+        return PORTS.remove(lockedPort);
+    }
+
+    /**
+     * Check port if locked.
+     *
+     * @return whether the port is locked.
+     */
+    public static synchronized boolean checkPortIfLocked(int lockedPort) {
+        return PORTS.contains(lockedPort);
+    }
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java
new file mode 100644
index 00000000000..88057ba943a
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import org.testng.annotations.Test;
+
+import static org.apache.pulsar.common.util.PortManager.checkPortIfLocked;
+import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
+import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+public class PortManagerTest {
+    @Test
+    public void testCheckPortIfLockedAndRemove() {
+        int port = nextLockedFreePort();
+        assertTrue(checkPortIfLocked(port));
+        assertTrue(releaseLockedPort(port));
+        assertFalse(checkPortIfLocked(port));
+    }
+}
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
index 7921e784f60..327562c77de 100644
--- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -27,6 +27,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
+import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
 import static org.testng.Assert.assertFalse;
 import com.google.common.base.Stopwatch;
 import java.io.File;
@@ -83,7 +84,7 @@ import org.apache.bookkeeper.test.TmpDirs;
 import org.apache.bookkeeper.test.ZooKeeperCluster;
 import org.apache.bookkeeper.test.ZooKeeperClusterUtil;
 import org.apache.bookkeeper.util.DiskChecker;
-import org.apache.bookkeeper.util.PortManager;
+import org.apache.pulsar.common.util.PortManager;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
@@ -129,6 +130,8 @@ public abstract class BookKeeperClusterTestCase {
 
     private boolean isAutoRecoveryEnabled;
 
+    private final List<Integer> bookiePorts = new ArrayList<>();
+
     SynchronousQueue<Throwable> asyncExceptions = new SynchronousQueue<>();
     protected void captureThrowable(Runnable c) {
         try {
@@ -282,6 +285,7 @@ public abstract class BookKeeperClusterTestCase {
             t.shutdown();
         }
         servers.clear();
+        bookiePorts.removeIf(PortManager::releaseLockedPort);
     }
 
     protected ServerConfiguration newServerConfiguration() throws Exception {
@@ -289,7 +293,7 @@ public abstract class BookKeeperClusterTestCase {
 
         int port;
         if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) {
-            port = PortManager.nextFreePort();
+            port = nextLockedFreePort();
         } else {
             port = 0;
         }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
index 0cc4bbb6bb5..c779acb6ebe 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
@@ -22,12 +22,12 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.*;
 import io.netty.channel.socket.SocketChannel;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.util.PortManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.util.PortManager;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.pulsar.proxy.server.ProxyConfiguration;
 import org.apache.pulsar.proxy.server.ProxyService;
@@ -43,12 +43,15 @@ import java.net.Socket;
 import java.net.SocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 
+import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
 import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertEquals;
 
@@ -60,6 +63,8 @@ public abstract class SimpleProxyExtensionTestBase extends MockedPulsarServiceBa
 
         private ProxyConfiguration conf;
 
+        private final List<Integer> ports = new ArrayList<>();
+
         @Override
         public String extensionName() {
             return "test";
@@ -81,7 +86,9 @@ public abstract class SimpleProxyExtensionTestBase extends MockedPulsarServiceBa
 
         @Override
         public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
-            return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), PortManager.nextFreePort()),
+            int port = nextLockedFreePort();
+            this.ports.add(port);
+            return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), port),
                     new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel socketChannel) throws Exception {
@@ -106,7 +113,7 @@ public abstract class SimpleProxyExtensionTestBase extends MockedPulsarServiceBa
 
         @Override
         public void close() {
-
+            ports.removeIf(PortManager::releaseLockedPort);
         }
     }