You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/10/05 03:28:51 UTC
[pulsar] branch master updated: Fixed intermittent test failures
with "bind error" (#2725)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 79dd772 Fixed intermittent test failures with "bind error" (#2725)
79dd772 is described below
commit 79dd772f61bdb78816247bc3ad24a59cdc8f7ac6
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Oct 4 20:28:44 2018 -0700
Fixed intermittent test failures with "bind error" (#2725)
---
.../apache/pulsar/broker/SLAMonitoringTest.java | 2 +-
.../AntiAffinityNamespaceGroupTest.java | 2 +-
.../broker/loadbalance/LoadBalancerTest.java | 2 +-
.../loadbalance/ModularLoadManagerImplTest.java | 2 +-
.../loadbalance/SimpleLoadManagerImplTest.java | 2 +-
.../broker/service/AdvertisedAddressTest.java | 3 +-
.../broker/service/BacklogQuotaManagerTest.java | 9 ++---
.../broker/service/BrokerBkEnsemblesTests.java | 10 +++---
.../pulsar/broker/service/ReplicatorTestBase.java | 6 ++--
.../broker/service/v1/V1_ReplicatorTestBase.java | 6 ++--
.../pulsar/client/api/NonPersistentTopicTest.java | 6 ++--
.../worker/PulsarWorkerAssignmentTest.java | 2 +-
.../apache/pulsar/io/PulsarFunctionAdminTest.java | 8 ++---
.../apache/pulsar/io/PulsarFunctionTlsTest.java | 2 +-
.../org/apache/pulsar/io/PulsarSinkE2ETest.java | 2 +-
pulsar-zookeeper-utils/pom.xml | 8 +++++
.../pulsar/zookeeper/LocalBookkeeperEnsemble.java | 41 ++++++++++++++++++----
.../zookeeper/LocalBookkeeperEnsembleTest.java | 16 ++++-----
18 files changed, 83 insertions(+), 46 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index cda002b..233d6b7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -72,7 +72,7 @@ public class SLAMonitoringTest {
void setup() throws Exception {
log.info("---- Initializing SLAMonitoringTest -----");
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+ bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
// start brokers
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index 6c20c77..8088637 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -112,7 +112,7 @@ public class AntiAffinityNamespaceGroupTest {
void setup() throws Exception {
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+ bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
// Start broker 1
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index d9e5b78..b90fa6f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -120,7 +120,7 @@ public class LoadBalancerTest {
@BeforeMethod
void setup() throws Exception {
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+ bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
ZkUtils.createFullPathOptimistic(bkEnsemble.getZkClient(),
SimpleLoadManagerImpl.LOADBALANCER_DYNAMIC_SETTING_STRATEGY_ZPATH,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index 863833a..0303fbf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -146,7 +146,7 @@ public class ModularLoadManagerImplTest {
void setup() throws Exception {
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+ bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
// Start broker 1
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 052c97e..54c51fc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -127,7 +127,7 @@ public class SimpleLoadManagerImplTest {
void setup() throws Exception {
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+ bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
// Start broker 1
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
index dda76b1..3990168 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;
+import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -44,7 +45,7 @@ public class AdvertisedAddressTest {
@BeforeMethod
public void setup() throws Exception {
- bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 5001);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
ServiceConfiguration config = new ServiceConfiguration();
config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 0026559..97155ee 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -53,7 +54,7 @@ import com.google.common.collect.Sets;
/**
*/
public class BacklogQuotaManagerTest {
- protected static int BROKER_SERVICE_PORT = 16650;
+ protected static int BROKER_SERVICE_PORT = PortManager.nextFreePort();
PulsarService pulsar;
ServiceConfiguration config;
@@ -62,15 +63,15 @@ public class BacklogQuotaManagerTest {
LocalBookkeeperEnsemble bkEnsemble;
- private final int ZOOKEEPER_PORT = 12759;
- protected final int BROKER_WEBSERVICE_PORT = 15782;
+ private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+ protected final int BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
@BeforeMethod
void setup() throws Exception {
try {
// start local bookie and zookeeper
- bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 5001);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
// start pulsar service
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 94823ad..cc82ff4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -37,6 +37,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.test.PortManager;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -60,7 +61,7 @@ import org.testng.annotations.Test;
/**
*/
public class BrokerBkEnsemblesTests {
- protected static int BROKER_SERVICE_PORT = 16650;
+ protected static int BROKER_SERVICE_PORT = PortManager.nextFreePort();
protected PulsarService pulsar;
ServiceConfiguration config;
@@ -69,10 +70,9 @@ public class BrokerBkEnsemblesTests {
LocalBookkeeperEnsemble bkEnsemble;
- private final int ZOOKEEPER_PORT = 12759;
- protected final int BROKER_WEBSERVICE_PORT = 15782;
+ private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+ protected final int BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
- protected final int bkBasePort = 5001;
private final int numberOfBookies;
public BrokerBkEnsemblesTests() {
@@ -87,7 +87,7 @@ public class BrokerBkEnsemblesTests {
protected void setup() throws Exception {
try {
// start local bookie and zookeeper
- bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, ZOOKEEPER_PORT, 5001);
+ bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
// start pulsar service
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 2f599ee..10754b9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -105,7 +105,7 @@ public class ReplicatorTestBase {
// Start region 1
int zkPort1 = PortManager.nextFreePort();
- bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, PortManager.nextFreePort());
+ bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, () -> PortManager.nextFreePort());
bkEnsemble1.start();
int webServicePort1 = PortManager.nextFreePort();
@@ -143,7 +143,7 @@ public class ReplicatorTestBase {
// Start zk & bks
int zkPort2 = PortManager.nextFreePort();
- bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, PortManager.nextFreePort());
+ bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, () -> PortManager.nextFreePort());
bkEnsemble2.start();
int webServicePort2 = PortManager.nextFreePort();
@@ -177,7 +177,7 @@ public class ReplicatorTestBase {
// Start zk & bks
int zkPort3 = PortManager.nextFreePort();
- bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, PortManager.nextFreePort());
+ bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, () -> PortManager.nextFreePort());
bkEnsemble3.start();
int webServicePort3 = PortManager.nextFreePort();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
index fc5001b..cdc7c06 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
@@ -104,7 +104,7 @@ public class V1_ReplicatorTestBase {
// Start region 1
int zkPort1 = PortManager.nextFreePort();
- bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, PortManager.nextFreePort());
+ bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, () -> PortManager.nextFreePort());
bkEnsemble1.start();
int webServicePort1 = PortManager.nextFreePort();
@@ -142,7 +142,7 @@ public class V1_ReplicatorTestBase {
// Start zk & bks
int zkPort2 = PortManager.nextFreePort();
- bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, PortManager.nextFreePort());
+ bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, () -> PortManager.nextFreePort());
bkEnsemble2.start();
int webServicePort2 = PortManager.nextFreePort();
@@ -176,7 +176,7 @@ public class V1_ReplicatorTestBase {
// Start zk & bks
int zkPort3 = PortManager.nextFreePort();
- bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, PortManager.nextFreePort());
+ bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, () -> PortManager.nextFreePort());
bkEnsemble3.start();
int webServicePort3 = PortManager.nextFreePort();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 29901b5..2e7a444 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -871,7 +871,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
// Start region 1
int zkPort1 = PortManager.nextFreePort();
- bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, PortManager.nextFreePort());
+ bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, () -> PortManager.nextFreePort());
bkEnsemble1.start();
int webServicePort1 = PortManager.nextFreePort();
@@ -901,7 +901,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
// Start zk & bks
int zkPort2 = PortManager.nextFreePort();
- bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, PortManager.nextFreePort());
+ bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, () -> PortManager.nextFreePort());
bkEnsemble2.start();
int webServicePort2 = PortManager.nextFreePort();
@@ -927,7 +927,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
// Start zk & bks
int zkPort3 = PortManager.nextFreePort();
- bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, PortManager.nextFreePort());
+ bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, () -> PortManager.nextFreePort());
bkEnsemble3.start();
int webServicePort3 = PortManager.nextFreePort();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 8f18259..0f9219d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -98,7 +98,7 @@ public class PulsarWorkerAssignmentTest {
log.info("--- Setting up method {} ---", method.getName());
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+ bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
String brokerServiceUrl = "http://127.0.0.1:" + brokerServicePort;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index a758c87..996d931 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -101,7 +101,7 @@ public class PulsarFunctionAdminTest {
log.info("--- Setting up method {} ---", method.getName());
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+ bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
String brokerServiceUrl = "https://127.0.0.1:" + brokerWebServiceTlsPort;
@@ -126,7 +126,7 @@ public class PulsarFunctionAdminTest {
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsAllowInsecureConnection(true);
-
+
functionsWorkerService = createPulsarFunctionWorker(config);
urlTls = new URL(brokerServiceUrl);
@@ -160,11 +160,11 @@ public class PulsarFunctionAdminTest {
workerConfig.getClientAuthenticationParameters());
}
pulsarClient = clientBuilder.build();
-
+
TenantInfo propAdmin = new TenantInfo();
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
admin.tenants().updateTenant(tenant, propAdmin);
-
+
Thread.sleep(100);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 97de0b8..2b32fd0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -92,7 +92,7 @@ public class PulsarFunctionTlsTest {
log.info("--- Setting up method {} ---", method.getName());
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+ bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
config = spy(new ServiceConfiguration());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index dd39222..907cf86 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -133,7 +133,7 @@ public class PulsarSinkE2ETest {
log.info("--- Setting up method {} ---", method.getName());
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+ bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble.start();
String brokerServiceUrl = "https://127.0.0.1:" + brokerWebServiceTlsPort;
diff --git a/pulsar-zookeeper-utils/pom.xml b/pulsar-zookeeper-utils/pom.xml
index 900dcf3..e30e4b1 100644
--- a/pulsar-zookeeper-utils/pom.xml
+++ b/pulsar-zookeeper-utils/pom.xml
@@ -66,6 +66,14 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>managed-ledger-original</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <version>${project.parent.version}</version>
+ </dependency>
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-common</artifactId>
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index f7923e4..1ead796 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -39,6 +39,7 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
@@ -81,8 +82,25 @@ public class LocalBookkeeperEnsemble {
int numberOfBookies;
private boolean clearOldData = false;
- public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort) {
- this(numberOfBookies, zkPort, bkBasePort, null, null, true);
+ private static class BasePortManager implements Supplier<Integer> {
+
+ private int port;
+
+ public BasePortManager(int basePort) {
+ this.port = basePort;
+ }
+
+ @Override
+ public synchronized Integer get() {
+ return port++;
+ }
+ }
+
+ private final Supplier<Integer> portManager;
+
+
+ public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, Supplier<Integer> portManager) {
+ this(numberOfBookies, zkPort, 4181, null, null, true, null, portManager);
}
public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName,
@@ -103,10 +121,22 @@ public class LocalBookkeeperEnsemble {
String bkDataDirName,
boolean clearOldData,
String advertisedAddress) {
+ this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress,
+ new BasePortManager(bkBasePort));
+ }
+
+ public LocalBookkeeperEnsemble(int numberOfBookies,
+ int zkPort,
+ int streamStoragePort,
+ String zkDataDirName,
+ String bkDataDirName,
+ boolean clearOldData,
+ String advertisedAddress,
+ Supplier<Integer> portManager) {
this.numberOfBookies = numberOfBookies;
this.HOSTPORT = "127.0.0.1:" + zkPort;
this.ZooKeeperDefaultPort = zkPort;
- this.initialPort = bkBasePort;
+ this.portManager = portManager;
this.streamStoragePort = streamStoragePort;
this.zkDataDirName = zkDataDirName;
this.bkDataDirName = bkDataDirName;
@@ -128,7 +158,6 @@ public class LocalBookkeeperEnsemble {
String bkDataDirName;
BookieServer bs[];
ServerConfiguration bsConfs[];
- Integer initialPort = 5000;
// Stream/Table Storage
StreamStorageLifecycleComponent streamStorage;
@@ -221,7 +250,7 @@ public class LocalBookkeeperEnsemble {
cleanDirectory(bkDataDir);
}
- int bookiePort = initialPort + i;
+ int bookiePort = portManager.get();
// Ensure registration Z-nodes are cleared when standalone service is restarted ungracefully
String registrationZnode = String.format("/ledgers/available/%s:%d", baseConf.getAdvertisedAddress(), bookiePort);
@@ -257,7 +286,7 @@ public class LocalBookkeeperEnsemble {
bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE);
}
bs[i].start();
- LOG.debug("Local BK[{}] started (port: {}, data_directory: {})", i, initialPort + i,
+ LOG.debug("Local BK[{}] started (port: {}, data_directory: {})", i, bookiePort,
bkDataDir.getAbsolutePath());
}
}
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java
index 95735af..a6dedaf 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java
@@ -18,18 +18,18 @@
*/
package org.apache.pulsar.zookeeper;
-import java.io.File;
-
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.assertFalse;
+
+import java.io.File;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.commons.io.FileUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import org.apache.bookkeeper.test.PortManager;
-import org.apache.commons.io.FileUtils;
-import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@Test
public class LocalBookkeeperEnsembleTest {
@@ -62,16 +62,14 @@ public class LocalBookkeeperEnsembleTest {
final int numBk = 1;
final int zkPort = PortManager.nextFreePort();
- final int bkPort = PortManager.nextFreePort();
// Start local Bookies/ZooKeepers and confirm that they are running at specified ports
- LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble(numBk, zkPort, bkPort);
+ LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble(numBk, zkPort, () -> PortManager.nextFreePort());
ensemble.start();
assertTrue(ensemble.getZkServer().isRunning());
assertEquals(ensemble.getZkServer().getClientPort(), zkPort);
assertTrue(ensemble.getZkClient().getState().isConnected());
assertTrue(ensemble.getBookies()[0].isRunning());
- assertEquals(ensemble.getBookies()[0].getLocalAddress().getPort(), bkPort);
// Stop local Bookies/ZooKeepers and confirm that they are correctly closed
ensemble.stop();