You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/10/17 01:44:19 UTC
[pulsar] branch master updated: [fix][test] AdvertisedListenersTest.setup (#17869)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b530184464c [fix][test] AdvertisedListenersTest.setup (#17869)
b530184464c is described below
commit b530184464ce0afe8b002c14ca62fbd96ea4dfcf
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Mon Oct 17 09:44:11 2022 +0800
[fix][test] AdvertisedListenersTest.setup (#17869)
---
.../mledger/impl/ManagedLedgerBkTest.java | 4 +-
.../bookkeeper/test/BookKeeperClusterTestCase.java | 9 ++-
.../apache/pulsar/broker/MultiBrokerBaseTest.java | 5 ++
.../loadbalance/AdvertisedListenersTest.java | 8 +--
.../protocol/SimpleProtocolHandlerTestsBase.java | 13 +++-
.../functions/worker/PulsarFunctionTlsTest.java | 11 +++-
.../org/apache/pulsar/common/util/PortManager.java | 69 ++++++++++++++++++++++
.../apache/pulsar/common/util/PortManagerTest.java | 37 ++++++++++++
.../bookkeeper/test/BookKeeperClusterTestCase.java | 8 ++-
.../extensions/SimpleProxyExtensionTestBase.java | 13 +++-
10 files changed, 158 insertions(+), 19 deletions(-)
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
index 3ad521eeb79..5c42e7f46bd 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;
+import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -117,7 +118,7 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
metadataStore.unsetAlwaysFail();
bkc = new BookKeeperTestClient(baseClientConf);
- startNewBookie();
+ int port = startNewBookie();
// Reconnect a new bk client
factory.shutdown();
@@ -147,6 +148,7 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
assertEquals("entry-2", new String(entries.get(0).getData()));
entries.forEach(Entry::release);
factory.shutdown();
+ releaseLockedPort(port);
}
@Test
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index e4f1d470bf6..5518838be37 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -24,6 +24,7 @@
package org.apache.bookkeeper.test;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
+import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertFalse;
import com.google.common.base.Stopwatch;
@@ -62,7 +63,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.Auditor;
import org.apache.bookkeeper.replication.ReplicationWorker;
-import org.apache.bookkeeper.util.PortManager;
+import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
@@ -113,6 +114,7 @@ public abstract class BookKeeperClusterTestCase {
private boolean isAutoRecoveryEnabled;
protected ExecutorService executor;
+ private final List<Integer> bookiePorts = new ArrayList<>();
SynchronousQueue<Throwable> asyncExceptions = new SynchronousQueue<>();
protected void captureThrowable(Runnable c) {
@@ -264,7 +266,7 @@ public abstract class BookKeeperClusterTestCase {
// Create Bookie Servers (B1, B2, B3)
for (int i = 0; i < numBookies; i++) {
- startNewBookie();
+ bookiePorts.add(startNewBookie());
}
}
@@ -283,6 +285,7 @@ public abstract class BookKeeperClusterTestCase {
t.shutdown();
}
servers.clear();
+ bookiePorts.removeIf(PortManager::releaseLockedPort);
}
protected ServerConfiguration newServerConfiguration() throws Exception {
@@ -290,7 +293,7 @@ public abstract class BookKeeperClusterTestCase {
int port;
if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) {
- port = PortManager.nextFreePort();
+ port = nextLockedFreePort();
} else {
port = 0;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
index c00ae8cd0d3..5b78a32dc37 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
@@ -21,12 +21,14 @@ package org.apache.pulsar.broker;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -124,6 +126,9 @@ public abstract class MultiBrokerBaseTest extends MockedPulsarServiceBaseTest {
try {
pulsarService.getConfiguration().setBrokerShutdownTimeoutMs(0L);
pulsarService.close();
+ pulsarService.getConfiguration().getBrokerServicePort().ifPresent(PortManager::releaseLockedPort);
+ pulsarService.getConfiguration().getWebServicePort().ifPresent(PortManager::releaseLockedPort);
+ pulsarService.getConfiguration().getWebServicePortTls().ifPresent(PortManager::releaseLockedPort);
} catch (PulsarServerException e) {
// ignore
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
index 489efa5755b..9ca4510a209 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance;
+import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@@ -25,7 +26,6 @@ import java.net.URI;
import java.util.Optional;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.util.PortManager;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -66,9 +66,9 @@ public class AdvertisedListenersTest extends MultiBrokerBaseTest {
}
private void updateConfig(ServiceConfiguration conf, String advertisedAddress) {
- int pulsarPort = PortManager.nextFreePort();
- int httpPort = PortManager.nextFreePort();
- int httpsPort = PortManager.nextFreePort();
+ int pulsarPort = nextLockedFreePort();
+ int httpPort = nextLockedFreePort();
+ int httpsPort = nextLockedFreePort();
// Use invalid domain name as identifier and instead make sure the advertised listeners work as intended
conf.setAdvertisedAddress(advertisedAddress);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
index 639ccf7ecd0..f08771934f6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
@@ -22,12 +22,12 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.util.PortManager;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.common.util.PortManager;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -39,11 +39,14 @@ import java.net.Socket;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
+import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertEquals;
@Slf4j
@@ -54,6 +57,8 @@ public abstract class SimpleProtocolHandlerTestsBase extends BrokerTestBase {
private ServiceConfiguration conf;
+ private final List<Integer> ports = new ArrayList<>();
+
@Override
public String protocolName() {
return "test";
@@ -81,7 +86,9 @@ public abstract class SimpleProtocolHandlerTestsBase extends BrokerTestBase {
@Override
public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
- return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), PortManager.nextFreePort()),
+ int port = nextLockedFreePort();
+ this.ports.add(port);
+ return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), port),
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
@@ -106,7 +113,7 @@ public abstract class SimpleProtocolHandlerTestsBase extends BrokerTestBase {
@Override
public void close() {
-
+ ports.removeIf(PortManager::releaseLockedPort);
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 3aaf914b9eb..bfc9928a7de 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.worker;
+import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertEquals;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -33,7 +34,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.util.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
@@ -44,6 +44,7 @@ import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
@@ -86,8 +87,8 @@ public class PulsarFunctionTlsTest {
// start brokers
for (int i = 0; i < BROKER_COUNT; i++) {
- int brokerPort = PortManager.nextFreePort();
- int webPort = PortManager.nextFreePort();
+ int brokerPort = nextLockedFreePort();
+ int webPort = nextLockedFreePort();
ServiceConfiguration config = new ServiceConfiguration();
config.setBrokerShutdownTimeoutMs(0L);
@@ -196,6 +197,10 @@ public class PulsarFunctionTlsTest {
for (int i = 0; i < BROKER_COUNT; i++) {
if (pulsarServices[i] != null) {
pulsarServices[i].close();
+ pulsarServices[i].getConfiguration().
+ getBrokerServicePort().ifPresent(PortManager::releaseLockedPort);
+ pulsarServices[i].getConfiguration()
+ .getWebServicePort().ifPresent(PortManager::releaseLockedPort);
}
}
bkEnsemble.stop();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java
new file mode 100644
index 00000000000..b9df071fdbc
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import java.net.ServerSocket;
+import java.util.HashSet;
+import java.util.Set;
+
+public class PortManager {
+
+ private static final Set<Integer> PORTS = new HashSet<>();
+
+ /**
+ * Return a locked available port.
+ *
+ * @return locked available port.
+ */
+ public static synchronized int nextLockedFreePort() {
+ int exceptionCount = 0;
+ while (true) {
+ try (ServerSocket ss = new ServerSocket(0)) {
+ int port = ss.getLocalPort();
+ if (!checkPortIfLocked(port)) {
+ PORTS.add(port);
+ return port;
+ }
+ } catch (Exception e) {
+ exceptionCount++;
+ if (exceptionCount > 100) {
+ throw new RuntimeException("Unable to allocate socket port", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns whether the port was released successfully.
+ *
+ * @return whether the release is successful.
+ */
+ public static synchronized boolean releaseLockedPort(int lockedPort) {
+ return PORTS.remove(lockedPort);
+ }
+
+ /**
+ * Check port if locked.
+ *
+ * @return whether the port is locked.
+ */
+ public static synchronized boolean checkPortIfLocked(int lockedPort) {
+ return PORTS.contains(lockedPort);
+ }
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java
new file mode 100644
index 00000000000..88057ba943a
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import org.testng.annotations.Test;
+
+import static org.apache.pulsar.common.util.PortManager.checkPortIfLocked;
+import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
+import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+public class PortManagerTest {
+ @Test
+ public void testCheckPortIfLockedAndRemove() {
+ int port = nextLockedFreePort();
+ assertTrue(checkPortIfLocked(port));
+ assertTrue(releaseLockedPort(port));
+ assertFalse(checkPortIfLocked(port));
+ }
+}
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
index 7921e784f60..327562c77de 100644
--- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -27,6 +27,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
+import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertFalse;
import com.google.common.base.Stopwatch;
import java.io.File;
@@ -83,7 +84,7 @@ import org.apache.bookkeeper.test.TmpDirs;
import org.apache.bookkeeper.test.ZooKeeperCluster;
import org.apache.bookkeeper.test.ZooKeeperClusterUtil;
import org.apache.bookkeeper.util.DiskChecker;
-import org.apache.bookkeeper.util.PortManager;
+import org.apache.pulsar.common.util.PortManager;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
@@ -129,6 +130,8 @@ public abstract class BookKeeperClusterTestCase {
private boolean isAutoRecoveryEnabled;
+ private final List<Integer> bookiePorts = new ArrayList<>();
+
SynchronousQueue<Throwable> asyncExceptions = new SynchronousQueue<>();
protected void captureThrowable(Runnable c) {
try {
@@ -282,6 +285,7 @@ public abstract class BookKeeperClusterTestCase {
t.shutdown();
}
servers.clear();
+ bookiePorts.removeIf(PortManager::releaseLockedPort);
}
protected ServerConfiguration newServerConfiguration() throws Exception {
@@ -289,7 +293,7 @@ public abstract class BookKeeperClusterTestCase {
int port;
if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) {
- port = PortManager.nextFreePort();
+ port = nextLockedFreePort();
} else {
port = 0;
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
index 0cc4bbb6bb5..c779acb6ebe 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
@@ -22,12 +22,12 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.util.PortManager;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;
@@ -43,12 +43,15 @@ import java.net.Socket;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
+import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertEquals;
@@ -60,6 +63,8 @@ public abstract class SimpleProxyExtensionTestBase extends MockedPulsarServiceBa
private ProxyConfiguration conf;
+ private final List<Integer> ports = new ArrayList<>();
+
@Override
public String extensionName() {
return "test";
@@ -81,7 +86,9 @@ public abstract class SimpleProxyExtensionTestBase extends MockedPulsarServiceBa
@Override
public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
- return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), PortManager.nextFreePort()),
+ int port = nextLockedFreePort();
+ this.ports.add(port);
+ return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), port),
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
@@ -106,7 +113,7 @@ public abstract class SimpleProxyExtensionTestBase extends MockedPulsarServiceBa
@Override
public void close() {
-
+ ports.removeIf(PortManager::releaseLockedPort);
}
}