You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2018/02/23 02:31:01 UTC
[incubator-pulsar] branch master updated: Broker should not start
replicator for root partitioned-topic (#1262)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f38a003 Broker should not start replicator for root partitioned-topic (#1262)
f38a003 is described below
commit f38a003a2a8331da94996fc7ea871abfc779ae24
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Thu Feb 22 18:30:58 2018 -0800
Broker should not start replicator for root partitioned-topic (#1262)
* Broker should not start replicator for root partitioned-topic
* address comment
---
.../pulsar/broker/service/AbstractReplicator.java | 51 +++++++++++++--
.../pulsar/broker/service/BrokerService.java | 53 +++++++++-------
.../nonpersistent/NonPersistentReplicator.java | 3 +-
.../service/nonpersistent/NonPersistentTopic.java | 35 +++++++++--
.../service/persistent/PersistentReplicator.java | 3 +-
.../broker/service/persistent/PersistentTopic.java | 39 ++++++++++--
.../pulsar/broker/service/ReplicatorTest.java | 72 ++++++++++++++++++++++
.../pulsar/broker/service/ReplicatorTestBase.java | 3 +
8 files changed, 220 insertions(+), 39 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 49213c9..4642a85 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -18,17 +18,21 @@
*/
package org.apache.pulsar.broker.service;
+import static org.apache.pulsar.broker.web.PulsarWebResource.path;
+
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.Position;
-import org.apache.pulsar.broker.service.AbstractReplicator.State;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.DestinationName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,8 +61,9 @@ public abstract class AbstractReplicator {
Stopped, Starting, Started, Stopping
}
- public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster,
- String remoteCluster, BrokerService brokerService) {
+ public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster,
+ BrokerService brokerService) throws NamingException {
+ validatePartitionedTopic(topicName, brokerService);
this.brokerService = brokerService;
this.topicName = topicName;
this.replicatorPrefix = replicatorPrefix;
@@ -69,8 +74,7 @@ public abstract class AbstractReplicator {
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
this.producerBuilder = client.newProducer() //
- .topic(topicName)
- .sendTimeout(0, TimeUnit.SECONDS) //
+ .topic(topicName).sendTimeout(0, TimeUnit.SECONDS) //
.maxPendingMessages(producerQueueSize) //
.producerName(getReplicatorName(replicatorPrefix, localCluster));
STATE_UPDATER.set(this, State.Stopped);
@@ -211,5 +215,42 @@ public abstract class AbstractReplicator {
return (replicatorPrefix + "." + cluster).intern();
}
+ /**
+ * Replication can't be started on root-partitioned-topic to avoid producer startup conflict.
+ *
+ * <pre>
+ * eg:
+ * if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic with 2 partitions then
+ * broker explicitly creates replicator producer for: "my-topic-partition-1" and "my-topic-partition-2".
+ *
+ * However, if broker tries to start producer with root topic "my-topic" then client-lib internally creates individual
+ * producers for "my-topic-partition-1" and "my-topic-partition-2" which creates conflict with existing
+ * replicator producers.
+ * </pre>
+ *
+ * Therefore, replicator can't be started on root-partition topic which can internally create multiple partitioned
+ * producers.
+ *
+ * @param topicName
+ * @param brokerService
+ */
+ private void validatePartitionedTopic(String topicName, BrokerService brokerService) throws NamingException {
+ DestinationName destination = DestinationName.get(topicName);
+ String partitionedTopicPath = path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE,
+ destination.getNamespace().toString(), destination.getDomain().toString(),
+ destination.getEncodedLocalName());
+ boolean isPartitionedTopic = false;
+ try {
+ isPartitionedTopic = brokerService.pulsar().getConfigurationCache().policiesCache()
+ .get(partitionedTopicPath).isPresent();
+ } catch (Exception e) {
+ log.warn("Failed to verify partitioned topic {}-{}", topicName, e.getMessage());
+ }
+ if (isPartitionedTopic) {
+ throw new NamingException(
+ topicName + " is a partitioned-topic and replication can't be started for partitioned-producer ");
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(AbstractReplicator.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1eff83b..c12527d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -65,6 +65,7 @@ import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -583,29 +584,37 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
new OpenLedgerCallback() {
@Override
public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
- PersistentTopic persistentTopic = new PersistentTopic(topic, ledger, BrokerService.this);
-
- CompletableFuture<Void> replicationFuture = persistentTopic.checkReplication();
- replicationFuture.thenCompose(v -> {
- // Also check dedup status
- return persistentTopic.checkDeduplicationStatus();
- }).thenRun(() -> {
- log.info("Created topic {} - dedup is {}", topic,
- persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
- long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
- - topicCreateTimeMs;
- pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
- addTopicToStatsMaps(destinationName, persistentTopic);
- topicFuture.complete(persistentTopic);
- }).exceptionally((ex) -> {
- log.warn("Replication or dedup check failed. Removing topic from topics list {}, {}", topic, ex);
- persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
- topics.remove(topic, topicFuture);
- topicFuture.completeExceptionally(ex);
+ try {
+ PersistentTopic persistentTopic = new PersistentTopic(topic, ledger,
+ BrokerService.this);
+ CompletableFuture<Void> replicationFuture = persistentTopic.checkReplication();
+ replicationFuture.thenCompose(v -> {
+ // Also check dedup status
+ return persistentTopic.checkDeduplicationStatus();
+ }).thenRun(() -> {
+ log.info("Created topic {} - dedup is {}", topic,
+ persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
+ long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
+ - topicCreateTimeMs;
+ pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
+ addTopicToStatsMaps(destinationName, persistentTopic);
+ topicFuture.complete(persistentTopic);
+ }).exceptionally((ex) -> {
+ log.warn(
+ "Replication or dedup check failed. Removing topic from topics list {}, {}",
+ topic, ex);
+ persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+ topics.remove(topic, topicFuture);
+ topicFuture.completeExceptionally(ex);
+ });
+
+ return null;
});
-
- return null;
- });
+ } catch (NamingException e) {
+ log.warn("Failed to create topic {}-{}", topic, e.getMessage());
+ pulsar.getExecutor().submit(() -> topics.remove(topic, topicFuture));
+ topicFuture.completeExceptionally(e);
+ }
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index e9219a6..74af8b6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -26,6 +26,7 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.client.api.MessageId;
@@ -49,7 +50,7 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
private final NonPersistentReplicatorStats stats = new NonPersistentReplicatorStats();
public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster,
- BrokerService brokerService) {
+ BrokerService brokerService) throws NamingException {
super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService);
producerBuilder.blockIfQueueFull(false);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index fdbadc6..5941768 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
@@ -49,6 +50,8 @@ import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyE
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
+import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
@@ -535,7 +538,12 @@ public class NonPersistentTopic implements Topic {
}
if (!replicators.containsKey(cluster)) {
- startReplicator(cluster);
+ if (!startReplicator(cluster)) {
+ // it happens when global topic is a partitioned topic and replicator can't start on original
+ // non partitioned-topic (topic without partition prefix)
+ return FutureUtil
+ .failedFuture(new NamingException(topic + " failed to start replicator for " + cluster));
+ }
}
}
@@ -550,13 +558,30 @@ public class NonPersistentTopic implements Topic {
return FutureUtil.waitForAll(futures);
}
- void startReplicator(String remoteCluster) {
+ boolean startReplicator(String remoteCluster) {
log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
- replicators.computeIfAbsent(remoteCluster,
- r -> new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService));
+ return addReplicationCluster(remoteCluster,NonPersistentTopic.this, localCluster);
}
+ protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic, String localCluster) {
+ AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
+ replicators.computeIfAbsent(remoteCluster, r -> {
+ try {
+ return new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService);
+ } catch (NamingException e) {
+ isReplicatorStarted.set(false);
+ log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
+ }
+ return null;
+ });
+ // clean up replicator if startup is failed
+ if (!isReplicatorStarted.get()) {
+ replicators.remove(remoteCluster);
+ }
+ return isReplicatorStarted.get();
+ }
+
CompletableFuture<Void> removeReplicator(String remoteCluster) {
log.info("[{}] Removing replicator to {}", topic, remoteCluster);
final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -941,5 +966,7 @@ public class NonPersistentTopic implements Topic {
this.hasBatchMessagePublished = true;
}
+
+
private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 6211819..52ed4b4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -38,6 +38,7 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.Backoff;
@@ -89,7 +90,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
private final ReplicatorStats stats = new ReplicatorStats();
public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
- BrokerService brokerService) {
+ BrokerService brokerService) throws NamingException {
super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService);
this.topic = topic;
this.cursor = cursor;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 0ce3925..88813ea 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -194,7 +194,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
}
}
- public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) {
+ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) throws NamingException {
this.topic = topic;
this.ledger = ledger;
this.brokerService = brokerService;
@@ -213,8 +213,11 @@ public class PersistentTopic implements Topic, AddEntryCallback {
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
- replicators.put(remoteCluster,
- new PersistentReplicator(this, cursor, localCluster, remoteCluster, brokerService));
+ boolean isReplicatorStarted = addReplicationCluster(remoteCluster, this, cursor, localCluster);
+ if (!isReplicatorStarted) {
+ throw new NamingException(
+ PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster);
+ }
} else if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)) {
// This is not a regular subscription, we are going to ignore it for now and let the message dedup logic
// to take care of it
@@ -896,9 +899,13 @@ public class PersistentTopic implements Topic, AddEntryCallback {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
- replicators.computeIfAbsent(remoteCluster, r -> new PersistentReplicator(PersistentTopic.this, cursor, localCluster,
- remoteCluster, brokerService));
- future.complete(null);
+ boolean isReplicatorStarted = addReplicationCluster(remoteCluster, PersistentTopic.this, cursor, localCluster);
+ if (isReplicatorStarted) {
+ future.complete(null);
+ } else {
+ future.completeExceptionally(new NamingException(
+ PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
+ }
}
@Override
@@ -911,6 +918,26 @@ public class PersistentTopic implements Topic, AddEntryCallback {
return future;
}
+ protected boolean addReplicationCluster(String remoteCluster, PersistentTopic persistentTopic, ManagedCursor cursor,
+ String localCluster) {
+ AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
+ replicators.computeIfAbsent(remoteCluster, r -> {
+ try {
+ return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
+ brokerService);
+ } catch (NamingException e) {
+ isReplicatorStarted.set(false);
+ log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
+ }
+ return null;
+ });
+ // clean up replicator if startup is failed
+ if (!isReplicatorStarted.get()) {
+ replicators.remove(remoteCluster);
+ }
+ return isReplicatorStarted.get();
+ }
+
CompletableFuture<Void> removeReplicator(String remoteCluster) {
log.info("[{}] Removing replicator to {}", topic, remoteCluster);
final CompletableFuture<Void> future = new CompletableFuture<>();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index cbef1d0..922f6ee 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -48,6 +48,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnedBundle;
import org.apache.pulsar.broker.namespace.OwnershipCache;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.checksum.utils.Crc32cChecksum;
@@ -74,6 +75,8 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.collections.Lists;
@@ -84,6 +87,14 @@ import io.netty.buffer.ByteBuf;
*/
public class ReplicatorTest extends ReplicatorTestBase {
+ protected String methodName;
+
+ @BeforeMethod
+ public void beforeMethod(Method m) throws Exception {
+ methodName = m.getName();
+ }
+
+
@Override
@BeforeClass(timeOut = 30000)
void setup() throws Exception {
@@ -102,6 +113,11 @@ public class ReplicatorTest extends ReplicatorTestBase {
});
}
+ @DataProvider(name = "partitionedTopic")
+ public Object[][] partitionedTopicProvider() {
+ return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+ }
+
@Test(enabled = true, timeOut = 30000)
public void testConfigChange() throws Exception {
log.info("--- Starting ReplicatorTest::testConfigChange ---");
@@ -856,6 +872,62 @@ public class ReplicatorTest extends ReplicatorTestBase {
reader2.closeAsync().get();
}
+ /**
+ * It verifies that broker should not start replicator for partitioned-topic (topic without -partition postfix)
+ *
+ * @param isPartitionedTopic
+ * @throws Exception
+ */
+ @Test(dataProvider = "partitionedTopic")
+ public void testReplicatorOnPartitionedTopic(boolean isPartitionedTopic) throws Exception {
+
+ log.info("--- Starting ReplicatorTest::{} --- ", methodName);
+
+ final String namespace = "pulsar/global/partitionedNs-" + isPartitionedTopic;
+ final String persistentTopicName = "persistent://" + namespace + "/partTopic-" + isPartitionedTopic;
+ final String nonPersistentTopicName = "non-persistent://" + namespace + "/partTopic-" + isPartitionedTopic;
+ BrokerService brokerService = pulsar1.getBrokerService();
+
+ admin1.namespaces().createNamespace(namespace);
+ admin1.namespaces().setNamespaceReplicationClusters(namespace, Lists.newArrayList("r1", "r2", "r3"));
+
+ if (isPartitionedTopic) {
+ admin1.persistentTopics().createPartitionedTopic(persistentTopicName, 5);
+ admin1.nonPersistentTopics().createPartitionedTopic(nonPersistentTopicName, 5);
+ }
+
+ // load namespace with dummy topic on ns
+ PulsarClient client = PulsarClient.create(url1.toString());
+ client.createProducer("persistent://" + namespace + "/dummyTopic");
+
+ // persistent topic test
+ try {
+ brokerService.getTopic(persistentTopicName).get();
+ if (isPartitionedTopic) {
+ fail("Topic creation fails with partitioned topic as replicator init fails");
+ }
+ } catch (Exception e) {
+ if (!isPartitionedTopic) {
+ fail("Topic creation should not fail without any partitioned topic");
+ }
+ assertTrue(e.getCause() instanceof NamingException);
+ }
+
+ // non-persistent topic test
+ try {
+ brokerService.getTopic(nonPersistentTopicName).get();
+ if (isPartitionedTopic) {
+ fail("Topic creation fails with partitioned topic as replicator init fails");
+ }
+ } catch (Exception e) {
+ if (!isPartitionedTopic) {
+ fail("Topic creation should not fail without any partitioned topic");
+ }
+ assertTrue(e.getCause() instanceof NamingException);
+ }
+
+ }
+
private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 344ad23..4320428 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -132,6 +132,7 @@ public class ReplicatorTestBase {
config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+ config1.setDefaultNumberOfNamespaceBundles(1);
pulsar1 = new PulsarService(config1);
pulsar1.start();
ns1 = pulsar1.getBrokerService();
@@ -165,6 +166,7 @@ public class ReplicatorTestBase {
config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+ config2.setDefaultNumberOfNamespaceBundles(1);
pulsar2 = new PulsarService(config2);
pulsar2.start();
ns2 = pulsar2.getBrokerService();
@@ -197,6 +199,7 @@ public class ReplicatorTestBase {
config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
+ config3.setDefaultNumberOfNamespaceBundles(1);
pulsar3 = new PulsarService(config3);
pulsar3.start();
ns3 = pulsar3.getBrokerService();
--
To stop receiving notification emails like this one, please contact
rdhabalia@apache.org.