You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by he...@apache.org on 2024/01/04 02:06:48 UTC
(pulsar) branch branch-3.0 updated: [fix][broker] Fixed the ExtensibleLoadManagerImpl internal system getTopic failure when the leadership changes #21764 (#21801)
This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 8c5a871c720 [fix][broker] Fixed the ExtensibleLoadManagerImpl internal system getTopic failure when the leadership changes #21764 (#21801)
8c5a871c720 is described below
commit 8c5a871c720a7f1efb83a4b6780be66935930903
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Wed Dec 27 07:47:36 2023 -0800
[fix][broker] Fixed the ExtensibleLoadManagerImpl internal system getTopic failure when the leadership changes #21764 (#21801)
---
.../extensions/ExtensibleLoadManagerImpl.java | 116 ++++++++++-----------
.../extensions/store/LoadDataStore.java | 17 +++
.../store/TableViewLoadDataStoreImpl.java | 30 +++++-
.../extensions/ExtensibleLoadManagerImplTest.java | 16 +--
.../extensions/filter/BrokerFilterTestBase.java | 15 +++
.../extensions/scheduler/TransferShedderTest.java | 30 ++++++
.../extensions/store/LoadDataStoreTest.java | 3 +
.../strategy/LeastResourceUsageWithWeightTest.java | 15 +++
.../loadbalance/ExtensibleLoadManagerTest.java | 47 ++++++---
9 files changed, 206 insertions(+), 83 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 37ca29da260..f717286fe5d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -279,13 +279,18 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
log.info("Created topic {}.", topic);
} catch (PulsarAdminException.ConflictException ex) {
if (debug(pulsar.getConfiguration(), log)) {
- log.info("Topic {} already exists.", topic, ex);
+ log.info("Topic {} already exists.", topic);
}
} catch (PulsarAdminException e) {
throw new PulsarServerException(e);
}
}
+ private static void createSystemTopics(PulsarService pulsar) throws PulsarServerException {
+ createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC);
+ createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
+ }
+
@Override
public void start() throws PulsarServerException {
if (this.started) {
@@ -321,13 +326,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper));
- createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC);
- createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
-
try {
this.brokerLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
- this.brokerLoadDataStore.startTableView();
this.topBundlesLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(), TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
} catch (LoadDataStoreException e) {
@@ -382,7 +383,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
this.unloadScheduler = new UnloadScheduler(
pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context,
serviceUnitStateChannel, unloadCounter, unloadMetrics);
- this.unloadScheduler.start();
this.splitScheduler = new SplitScheduler(
pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
this.splitScheduler.start();
@@ -740,74 +740,74 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
@VisibleForTesting
void playLeader() {
- if (role != Leader) {
- log.info("This broker:{} is changing the role from {} to {}",
- pulsar.getLookupServiceAddress(), role, Leader);
- int retry = 0;
- while (true) {
+ log.info("This broker:{} is setting the role from {} to {}",
+ pulsar.getLookupServiceAddress(), role, Leader);
+ int retry = 0;
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ initWaiter.await();
+ // Confirm the system topics have been created or create them if they do not exist.
+ // If the leader has changed, the new leader need to reset
+ // the local brokerService.topics (by this topic creations).
+ // Otherwise, the system topic existence check will fail on the leader broker.
+ createSystemTopics(pulsar);
+ brokerLoadDataStore.init();
+ topBundlesLoadDataStore.init();
+ unloadScheduler.start();
+ serviceUnitStateChannel.scheduleOwnershipMonitor();
+ break;
+ } catch (Throwable e) {
+ log.error("The broker:{} failed to set the role. Retrying {} th ...",
+ pulsar.getLookupServiceAddress(), ++retry, e);
try {
- initWaiter.await();
- serviceUnitStateChannel.scheduleOwnershipMonitor();
- topBundlesLoadDataStore.startTableView();
- unloadScheduler.start();
- break;
- } catch (Throwable e) {
- log.error("The broker:{} failed to change the role. Retrying {} th ...",
- pulsar.getLookupServiceAddress(), ++retry, e);
- try {
- Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
- } catch (InterruptedException ex) {
- log.warn("Interrupted while sleeping.");
- }
+ Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
+ } catch (InterruptedException ex) {
+ log.warn("Interrupted while sleeping.");
+ // preserve thread's interrupt status
+ Thread.currentThread().interrupt();
}
}
- role = Leader;
- log.info("This broker:{} plays the leader now.", pulsar.getLookupServiceAddress());
}
+ role = Leader;
+ log.info("This broker:{} plays the leader now.", pulsar.getLookupServiceAddress());
// flush the load data when the leader is elected.
- if (brokerLoadDataReporter != null) {
- brokerLoadDataReporter.reportAsync(true);
- }
- if (topBundleLoadDataReporter != null) {
- topBundleLoadDataReporter.reportAsync(true);
- }
+ brokerLoadDataReporter.reportAsync(true);
+ topBundleLoadDataReporter.reportAsync(true);
}
@VisibleForTesting
void playFollower() {
- if (role != Follower) {
- log.info("This broker:{} is changing the role from {} to {}",
- pulsar.getLookupServiceAddress(), role, Follower);
- int retry = 0;
- while (true) {
+ log.info("This broker:{} is setting the role from {} to {}",
+ pulsar.getLookupServiceAddress(), role, Follower);
+ int retry = 0;
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ initWaiter.await();
+ unloadScheduler.close();
+ serviceUnitStateChannel.cancelOwnershipMonitor();
+ brokerLoadDataStore.init();
+ topBundlesLoadDataStore.close();
+ topBundlesLoadDataStore.startProducer();
+ break;
+ } catch (Throwable e) {
+ log.error("The broker:{} failed to set the role. Retrying {} th ...",
+ pulsar.getLookupServiceAddress(), ++retry, e);
try {
- initWaiter.await();
- serviceUnitStateChannel.cancelOwnershipMonitor();
- topBundlesLoadDataStore.closeTableView();
- unloadScheduler.close();
- break;
- } catch (Throwable e) {
- log.error("The broker:{} failed to change the role. Retrying {} th ...",
- pulsar.getLookupServiceAddress(), ++retry, e);
- try {
- Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
- } catch (InterruptedException ex) {
- log.warn("Interrupted while sleeping.");
- }
+ Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
+ } catch (InterruptedException ex) {
+ log.warn("Interrupted while sleeping.");
+ // preserve thread's interrupt status
+ Thread.currentThread().interrupt();
}
}
- role = Follower;
- log.info("This broker:{} plays a follower now.", pulsar.getLookupServiceAddress());
}
+ role = Follower;
+ log.info("This broker:{} plays a follower now.", pulsar.getLookupServiceAddress());
// flush the load data when the leader is elected.
- if (brokerLoadDataReporter != null) {
- brokerLoadDataReporter.reportAsync(true);
- }
- if (topBundleLoadDataReporter != null) {
- topBundleLoadDataReporter.reportAsync(true);
- }
+ brokerLoadDataReporter.reportAsync(true);
+ topBundleLoadDataReporter.reportAsync(true);
}
public List<Metrics> getMetrics() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
index 680a36523a2..a7deeeaad8a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
@@ -81,9 +81,26 @@ public interface LoadDataStore<T> extends Closeable {
*/
void closeTableView() throws IOException;
+
+ /**
+ * Starts the data store (both producer and table view).
+ */
+ void start() throws LoadDataStoreException;
+
+ /**
+ * Inits the data store (close and start the data store).
+ */
+ void init() throws IOException;
+
/**
* Starts the table view.
*/
void startTableView() throws LoadDataStoreException;
+
+ /**
+ * Starts the producer.
+ */
+ void startProducer() throws LoadDataStoreException;
+
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
index a400163ebf1..ead0a7081fd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
@@ -37,9 +37,9 @@ import org.apache.pulsar.client.api.TableView;
*/
public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
- private TableView<T> tableView;
+ private volatile TableView<T> tableView;
- private final Producer<T> producer;
+ private volatile Producer<T> producer;
private final PulsarClient client;
@@ -50,7 +50,6 @@ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
public TableViewLoadDataStoreImpl(PulsarClient client, String topic, Class<T> clazz) throws LoadDataStoreException {
try {
this.client = client;
- this.producer = client.newProducer(Schema.JSON(clazz)).topic(topic).create();
this.topic = topic;
this.clazz = clazz;
} catch (Exception e) {
@@ -99,6 +98,12 @@ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
}
}
+ @Override
+ public void start() throws LoadDataStoreException {
+ startProducer();
+ startTableView();
+ }
+
@Override
public void startTableView() throws LoadDataStoreException {
if (tableView == null) {
@@ -111,14 +116,33 @@ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
}
}
+ @Override
+ public void startProducer() throws LoadDataStoreException {
+ if (producer == null) {
+ try {
+ producer = client.newProducer(Schema.JSON(clazz)).topic(topic).create();
+ } catch (PulsarClientException e) {
+ producer = null;
+ throw new LoadDataStoreException(e);
+ }
+ }
+ }
+
@Override
public void close() throws IOException {
if (producer != null) {
producer.close();
+ producer = null;
}
closeTableView();
}
+ @Override
+ public void init() throws IOException {
+ close();
+ start();
+ }
+
private void validateTableViewStart() {
if (tableView == null) {
throw new IllegalStateException("table view has not been started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index f499998fd3d..05a6e4e8762 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -802,12 +802,12 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
FieldUtils.writeDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStoreSecondarySpy, true);
if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
- primaryLoadManager.playFollower();
- primaryLoadManager.playFollower();
+ primaryLoadManager.playFollower(); // close 3 times
+ primaryLoadManager.playFollower(); // close 1 time
secondaryLoadManager.playLeader();
secondaryLoadManager.playLeader();
- primaryLoadManager.playLeader();
- primaryLoadManager.playLeader();
+ primaryLoadManager.playLeader(); // close 3 times and open 3 times
+ primaryLoadManager.playLeader(); // close 1 time and open 1 time,
secondaryLoadManager.playFollower();
secondaryLoadManager.playFollower();
} else {
@@ -822,10 +822,10 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
}
- verify(topBundlesLoadDataStorePrimarySpy, times(3)).startTableView();
- verify(topBundlesLoadDataStorePrimarySpy, times(3)).closeTableView();
- verify(topBundlesLoadDataStoreSecondarySpy, times(3)).startTableView();
- verify(topBundlesLoadDataStoreSecondarySpy, times(3)).closeTableView();
+ verify(topBundlesLoadDataStorePrimarySpy, times(4)).startTableView();
+ verify(topBundlesLoadDataStorePrimarySpy, times(8)).closeTableView();
+ verify(topBundlesLoadDataStoreSecondarySpy, times(4)).startTableView();
+ verify(topBundlesLoadDataStoreSecondarySpy, times(8)).closeTableView();
FieldUtils.writeDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStorePrimary, true);
FieldUtils.writeDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStoreSecondary, true);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java
index 68bd7b29094..a120ef473e9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java
@@ -90,10 +90,25 @@ public class BrokerFilterTestBase {
}
+ @Override
+ public void start() throws LoadDataStoreException {
+
+ }
+
+ @Override
+ public void init() throws IOException {
+
+ }
+
@Override
public void startTableView() throws LoadDataStoreException {
}
+
+ @Override
+ public void startProducer() throws LoadDataStoreException {
+
+ }
};
configuration.setPreferLaterVersions(true);
doReturn(configuration).when(mockContext).brokerConfiguration();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index 26d95a0158d..4eec6124777 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -383,10 +383,25 @@ public class TransferShedderTest {
}
+ @Override
+ public void start() throws LoadDataStoreException {
+
+ }
+
+ @Override
+ public void init() throws IOException {
+
+ }
+
@Override
public void startTableView() throws LoadDataStoreException {
}
+
+ @Override
+ public void startProducer() throws LoadDataStoreException {
+
+ }
};
var topBundleLoadDataStore = new LoadDataStore<TopBundlesLoadData>() {
@@ -436,10 +451,25 @@ public class TransferShedderTest {
}
+ @Override
+ public void start() throws LoadDataStoreException {
+
+ }
+
+ @Override
+ public void init() throws IOException {
+
+ }
+
@Override
public void startTableView() throws LoadDataStoreException {
}
+
+ @Override
+ public void startProducer() throws LoadDataStoreException {
+
+ }
};
BrokerRegistry brokerRegistry = mock(BrokerRegistry.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
index 184c337a47c..7431b9815f9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
@@ -75,6 +75,7 @@ public class LoadDataStoreTest extends MockedPulsarServiceBaseTest {
@Cleanup
LoadDataStore<MyClass> loadDataStore =
LoadDataStoreFactory.create(pulsar.getClient(), topic, MyClass.class);
+ loadDataStore.startProducer();
loadDataStore.startTableView();
MyClass myClass1 = new MyClass("1", 1);
loadDataStore.pushAsync("key1", myClass1).get();
@@ -108,6 +109,7 @@ public class LoadDataStoreTest extends MockedPulsarServiceBaseTest {
@Cleanup
LoadDataStore<Integer> loadDataStore =
LoadDataStoreFactory.create(pulsar.getClient(), topic, Integer.class);
+ loadDataStore.startProducer();
loadDataStore.startTableView();
Map<String, Integer> map = new HashMap<>();
@@ -132,6 +134,7 @@ public class LoadDataStoreTest extends MockedPulsarServiceBaseTest {
String topic = TopicDomain.persistent + "://" + NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
LoadDataStore<Integer> loadDataStore =
LoadDataStoreFactory.create(pulsar.getClient(), topic, Integer.class);
+ loadDataStore.startProducer();
loadDataStore.startTableView();
loadDataStore.pushAsync("1", 1).get();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
index 0eea1d87513..b1e09bf2f3a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
@@ -252,10 +252,25 @@ public class LeastResourceUsageWithWeightTest {
}
+ @Override
+ public void start() throws LoadDataStoreException {
+
+ }
+
+ @Override
+ public void init() throws IOException {
+
+ }
+
@Override
public void startTableView() throws LoadDataStoreException {
}
+
+ @Override
+ public void startProducer() throws LoadDataStoreException {
+
+ }
};
doReturn(conf).when(ctx).brokerConfiguration();
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index 49e5ae37834..23abf50bdb0 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -38,6 +38,7 @@ import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
@@ -53,6 +54,7 @@ import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -309,23 +311,40 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport {
parameters1.put("min_limit", "1");
parameters1.put("usage_threshold", "100");
- List<String> activeBrokers = admin.brokers().getActiveBrokers();
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
+ () -> {
+ List<String> activeBrokers = admin.brokers().getActiveBrokers();
+ assertEquals(activeBrokers.size(), NUM_BROKERS);
+ }
+ );
+ try {
+ admin.namespaces().createNamespace(isolationEnabledNameSpace);
+ } catch (PulsarAdminException.ConflictException e) {
+ //expected when retried
+ }
- assertEquals(activeBrokers.size(), NUM_BROKERS);
+ try {
+ admin.clusters()
+ .createNamespaceIsolationPolicy(clusterName, namespaceIsolationPolicyName, NamespaceIsolationData
+ .builder()
+ .namespaces(List.of(isolationEnabledNameSpace))
+ .autoFailoverPolicy(AutoFailoverPolicyData.builder()
+ .policyType(AutoFailoverPolicyType.min_available)
+ .parameters(parameters1)
+ .build())
+ .primary(List.of(getHostName(0)))
+ .secondary(List.of(getHostName(1)))
+ .build());
+ } catch (PulsarAdminException.ConflictException e) {
+ //expected when retried
+ }
- admin.namespaces().createNamespace(isolationEnabledNameSpace);
- admin.clusters().createNamespaceIsolationPolicy(clusterName, namespaceIsolationPolicyName, NamespaceIsolationData
- .builder()
- .namespaces(List.of(isolationEnabledNameSpace))
- .autoFailoverPolicy(AutoFailoverPolicyData.builder()
- .policyType(AutoFailoverPolicyType.min_available)
- .parameters(parameters1)
- .build())
- .primary(List.of(getHostName(0)))
- .secondary(List.of(getHostName(1)))
- .build());
final String topic = "persistent://" + isolationEnabledNameSpace + "/topic";
- admin.topics().createNonPartitionedTopic(topic);
+ try {
+ admin.topics().createNonPartitionedTopic(topic);
+ } catch (PulsarAdminException.ConflictException e) {
+ //expected when retried
+ }
String broker = admin.lookups().lookupTopic(topic);