You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/02 20:15:26 UTC

[GitHub] jai1 closed pull request #1715: Broker should not start replicator for root partitioned-topic

jai1 closed pull request #1715: Broker should not start replicator for root partitioned-topic
URL: https://github.com/apache/incubator-pulsar/pull/1715
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 1913dd5ff3..d5480ccc8e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -18,17 +18,22 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.apache.pulsar.broker.web.PulsarWebResource.path;
+
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.DestinationName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,8 +62,9 @@
         Stopped, Starting, Started, Stopping
     }
 
-    public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster,
-            String remoteCluster, BrokerService brokerService) {
+    public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster,
+            BrokerService brokerService) throws NamingException {
+        validatePartitionedTopic(topicName, brokerService);
         this.brokerService = brokerService;
         this.topicName = topicName;
         this.replicatorPrefix = replicatorPrefix;
@@ -67,7 +73,6 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca
         this.client = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster);
         this.producer = null;
         this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
-
         this.producerConfiguration = new ProducerConfiguration();
         this.producerConfiguration.setSendTimeout(0, TimeUnit.SECONDS);
         this.producerConfiguration.setMaxPendingMessages(producerQueueSize);
@@ -214,5 +219,42 @@ public static String getReplicatorName(String replicatorPrefix, String cluster)
         return (replicatorPrefix + "." + cluster).intern();
     }
 
+    /**
+     * Replication can't be started on root-partitioned-topic to avoid producer startup conflict.
+     * 
+     * <pre>
+     * eg:
+     * if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic with 2 partitions then
+     * broker explicitly creates replicator producer for: "my-topic-partition-1" and "my-topic-partition-2".
+     * 
+     * However, if broker tries to start producer with root topic "my-topic" then client-lib internally creates individual 
+     * producers for "my-topic-partition-1" and "my-topic-partition-2" which creates conflict with existing 
+     * replicator producers.
+     * </pre>
+     * 
+     * Therefore, replicator can't be started on root-partition topic which can internally create multiple partitioned
+     * producers.
+     * 
+     * @param topicName
+     * @param brokerService
+     */
+    private void validatePartitionedTopic(String topicName, BrokerService brokerService) throws NamingException {
+        DestinationName destination = DestinationName.get(topicName);
+        String partitionedTopicPath = path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE,
+                destination.getNamespace().toString(), destination.getDomain().toString(),
+                destination.getEncodedLocalName());
+        boolean isPartitionedTopic = false;
+        try {
+            isPartitionedTopic = brokerService.pulsar().getConfigurationCache().policiesCache()
+                    .get(partitionedTopicPath).isPresent();
+        } catch (Exception e) {
+            log.warn("Failed to verify partitioned topic {}-{}", topicName, e.getMessage());
+        }
+        if (isPartitionedTopic) {
+            throw new NamingException(
+                    topicName + " is a partitioned-topic and replication can't be started for partitioned-producer ");
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(AbstractReplicator.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2f3b27a75b..1b5cadee84 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -65,6 +65,7 @@
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -583,29 +584,37 @@ private void createPersistentTopic(final String topic, CompletableFuture<Topic>
                     new OpenLedgerCallback() {
                         @Override
                         public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
-                            PersistentTopic persistentTopic = new PersistentTopic(topic, ledger, BrokerService.this);
-
-                            CompletableFuture<Void> replicationFuture = persistentTopic.checkReplication();
-                            replicationFuture.thenCompose(v -> {
-                                // Also check dedup status
-                                return persistentTopic.checkDeduplicationStatus();
-                            }).thenRun(() -> {
-                                log.info("Created topic {} - dedup is {}", topic,
-                                        persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
-                                long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
-                                        - topicCreateTimeMs;
-                                pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
-                                addTopicToStatsMaps(destinationName, persistentTopic);
-                                topicFuture.complete(persistentTopic);
-                            }).exceptionally((ex) -> {
-                                log.warn("Replication or dedup check failed. Removing topic from topics list {}, {}", topic, ex);
-                                persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
-                                    topics.remove(topic, topicFuture);
-                                    topicFuture.completeExceptionally(ex);
+                            try {
+                                PersistentTopic persistentTopic = new PersistentTopic(topic, ledger,
+                                        BrokerService.this);
+                                CompletableFuture<Void> replicationFuture = persistentTopic.checkReplication();
+                                replicationFuture.thenCompose(v -> {
+                                    // Also check dedup status
+                                    return persistentTopic.checkDeduplicationStatus();
+                                }).thenRun(() -> {
+                                    log.info("Created topic {} - dedup is {}", topic,
+                                            persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
+                                    long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
+                                            - topicCreateTimeMs;
+                                    pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
+                                    addTopicToStatsMaps(destinationName, persistentTopic);
+                                    topicFuture.complete(persistentTopic);
+                                }).exceptionally((ex) -> {
+                                    log.warn(
+                                            "Replication or dedup check failed. Removing topic from topics list {}, {}",
+                                            topic, ex);
+                                    persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+                                        topics.remove(topic, topicFuture);
+                                        topicFuture.completeExceptionally(ex);
+                                    });
+
+                                    return null;
                                 });
-
-                                return null;
-                            });
+                            } catch (NamingException e) {
+                                log.warn("Failed to create topic {}-{}", topic, e.getMessage());
+                                pulsar.getExecutor().submit(() -> topics.remove(topic, topicFuture));
+                                topicFuture.completeExceptionally(e);
+                            }
                         }
 
                         @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index d914aa7ece..44bb27d4e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -26,6 +26,7 @@
 import org.apache.bookkeeper.mledger.util.Rate;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.client.api.MessageId;
@@ -49,7 +50,7 @@
     private final NonPersistentReplicatorStats stats = new NonPersistentReplicatorStats();
 
     public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster,
-            BrokerService brokerService) {
+            BrokerService brokerService) throws NamingException {
         super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService);
 
         producerConfiguration.setBlockIfQueueFull(false);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 44a9e14434..511e7f8535 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -270,17 +270,6 @@ private boolean hasLocalProducers() {
         return foundLocal.get();
     }
 
-    private boolean hasRemoteProducers() {
-        AtomicBoolean foundRemote = new AtomicBoolean(false);
-        producers.forEach(producer -> {
-            if (producer.isRemote()) {
-                foundRemote.set(true);
-            }
-        });
-
-        return foundRemote.get();
-    }
-
     @Override
     public void removeProducer(Producer producer) {
         checkArgument(producer.getTopic() == this);
@@ -520,7 +509,12 @@ void removeSubscription(String subscriptionName) {
             }
 
             if (!replicators.containsKey(cluster)) {
-                startReplicator(cluster);
+                if (!startReplicator(cluster)) {
+                    // it happens when global topic is a partitioned topic and replicator can't start on original
+                    // non partitioned-topic (topic without partition prefix)
+                    return FutureUtil
+                            .failedFuture(new NamingException(topic + " failed to start replicator for " + cluster));
+                }
             }
         }
 
@@ -535,13 +529,30 @@ void removeSubscription(String subscriptionName) {
         return FutureUtil.waitForAll(futures);
     }
 
-    void startReplicator(String remoteCluster) {
+    boolean startReplicator(String remoteCluster) {
         log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
         String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
-        replicators.computeIfAbsent(remoteCluster,
-                r -> new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService));
+        return addReplicationCluster(remoteCluster,NonPersistentTopic.this, localCluster);
     }
 
+    protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic, String localCluster) {
+        AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
+        replicators.computeIfAbsent(remoteCluster, r -> {
+            try {
+                return new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService);
+            } catch (NamingException e) {
+                isReplicatorStarted.set(false);
+                log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
+            }
+            return null;
+        });
+        // clean up replicator if startup is failed
+        if (!isReplicatorStarted.get()) {
+            replicators.remove(remoteCluster);
+        }
+        return isReplicatorStarted.get();
+    }
+    
     CompletableFuture<Void> removeReplicator(String remoteCluster) {
         log.info("[{}] Removing replicator to {}", topic, remoteCluster);
         final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -913,6 +924,8 @@ public void markBatchMessagePublished() {
         this.hasBatchMessagePublished = true;
     }
 
+    
+    
     private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);
 
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 62118190ff..52ed4b4532 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -38,6 +38,7 @@
 import org.apache.bookkeeper.mledger.util.Rate;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.Backoff;
@@ -89,7 +90,7 @@
     private final ReplicatorStats stats = new ReplicatorStats();
 
     public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
-            BrokerService brokerService) {
+            BrokerService brokerService) throws NamingException {
         super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService);
         this.topic = topic;
         this.cursor = cursor;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 20ae52726e..9477486c5e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -193,7 +193,7 @@ public void reset() {
         }
     }
 
-    public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) {
+    public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) throws NamingException {
         this.topic = topic;
         this.ledger = ledger;
         this.brokerService = brokerService;
@@ -212,8 +212,11 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
             if (cursor.getName().startsWith(replicatorPrefix)) {
                 String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
                 String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
-                replicators.put(remoteCluster,
-                        new PersistentReplicator(this, cursor, localCluster, remoteCluster, brokerService));
+                boolean isReplicatorStarted = addReplicationCluster(remoteCluster, this, cursor, localCluster);
+                if (!isReplicatorStarted) {
+                    throw new NamingException(
+                            PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster);
+                }
             } else if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)) {
                 // This is not a regular subscription, we are going to ignore it for now and let the message dedup logic
                 // to take care of it
@@ -882,9 +885,13 @@ public void checkMessageDeduplicationInfo() {
             @Override
             public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
-                replicators.computeIfAbsent(remoteCluster, r -> new PersistentReplicator(PersistentTopic.this, cursor, localCluster,
-                        remoteCluster, brokerService));
-                future.complete(null);
+                boolean isReplicatorStarted = addReplicationCluster(remoteCluster, PersistentTopic.this, cursor, localCluster);
+                if (isReplicatorStarted) {
+                    future.complete(null);    
+                } else {
+                    future.completeExceptionally(new NamingException(
+                            PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
+                }
             }
 
             @Override
@@ -897,6 +904,26 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
         return future;
     }
 
+    protected boolean addReplicationCluster(String remoteCluster, PersistentTopic persistentTopic, ManagedCursor cursor,
+            String localCluster) {
+        AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
+        replicators.computeIfAbsent(remoteCluster, r -> {
+            try {
+                return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
+                        brokerService);
+            } catch (NamingException e) {
+                isReplicatorStarted.set(false);
+                log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
+            }
+            return null;
+        });
+        // clean up replicator if startup is failed
+        if (!isReplicatorStarted.get()) {
+            replicators.remove(remoteCluster);
+        }
+        return isReplicatorStarted.get();
+    }
+
     CompletableFuture<Void> removeReplicator(String remoteCluster) {
         log.info("[{}] Removing replicator to {}", topic, remoteCluster);
         final CompletableFuture<Void> future = new CompletableFuture<>();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index cbef1d0548..922f6ee67e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -48,6 +48,7 @@
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.namespace.OwnedBundle;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.checksum.utils.Crc32cChecksum;
@@ -74,6 +75,8 @@
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
 
@@ -84,6 +87,14 @@
  */
 public class ReplicatorTest extends ReplicatorTestBase {
 
+    protected String methodName;
+
+    @BeforeMethod
+    public void beforeMethod(Method m) throws Exception {
+        methodName = m.getName();
+    }
+
+    
     @Override
     @BeforeClass(timeOut = 30000)
     void setup() throws Exception {
@@ -102,6 +113,11 @@ void shutdown() throws Exception {
         });
     }
 
+    @DataProvider(name = "partitionedTopic")
+    public Object[][] partitionedTopicProvider() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+    
     @Test(enabled = true, timeOut = 30000)
     public void testConfigChange() throws Exception {
         log.info("--- Starting ReplicatorTest::testConfigChange ---");
@@ -856,6 +872,62 @@ public void verifyChecksumAfterReplication() throws Exception {
         reader2.closeAsync().get();
     }
 
+    /**
+     * It verifies that broker should not start replicator for partitioned-topic (topic without -partition postfix)
+     * 
+     * @param isPartitionedTopic
+     * @throws Exception
+     */
+    @Test(dataProvider = "partitionedTopic")
+    public void testReplicatorOnPartitionedTopic(boolean isPartitionedTopic) throws Exception {
+
+        log.info("--- Starting ReplicatorTest::{} --- ", methodName);
+
+        final String namespace = "pulsar/global/partitionedNs-" + isPartitionedTopic;
+        final String persistentTopicName = "persistent://" + namespace + "/partTopic-" + isPartitionedTopic;
+        final String nonPersistentTopicName = "non-persistent://" + namespace + "/partTopic-" + isPartitionedTopic;
+        BrokerService brokerService = pulsar1.getBrokerService();
+
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Lists.newArrayList("r1", "r2", "r3"));
+
+        if (isPartitionedTopic) {
+            admin1.persistentTopics().createPartitionedTopic(persistentTopicName, 5);
+            admin1.nonPersistentTopics().createPartitionedTopic(nonPersistentTopicName, 5);
+        }
+
+        // load namespace with dummy topic on ns
+        PulsarClient client = PulsarClient.create(url1.toString());
+        client.createProducer("persistent://" + namespace + "/dummyTopic");
+
+        // persistent topic test
+        try {
+            brokerService.getTopic(persistentTopicName).get();
+            if (isPartitionedTopic) {
+                fail("Topic creation fails with partitioned topic as replicator init fails");
+            }
+        } catch (Exception e) {
+            if (!isPartitionedTopic) {
+                fail("Topic creation should not fail without any partitioned topic");
+            }
+            assertTrue(e.getCause() instanceof NamingException);
+        }
+
+        // non-persistent topic test
+        try {
+            brokerService.getTopic(nonPersistentTopicName).get();
+            if (isPartitionedTopic) {
+                fail("Topic creation fails with partitioned topic as replicator init fails");
+            }
+        } catch (Exception e) {
+            if (!isPartitionedTopic) {
+                fail("Topic creation should not fail without any partitioned topic");
+            }
+            assertTrue(e.getCause() instanceof NamingException);
+        }
+
+    }
+    
     private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);
 
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 344ad23a0f..4320428b3b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -132,6 +132,7 @@ void setup() throws Exception {
         config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
         config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+        config1.setDefaultNumberOfNamespaceBundles(1);
         pulsar1 = new PulsarService(config1);
         pulsar1.start();
         ns1 = pulsar1.getBrokerService();
@@ -165,6 +166,7 @@ void setup() throws Exception {
         config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
         config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+        config2.setDefaultNumberOfNamespaceBundles(1);
         pulsar2 = new PulsarService(config2);
         pulsar2.start();
         ns2 = pulsar2.getBrokerService();
@@ -197,6 +199,7 @@ void setup() throws Exception {
         config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
         config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config3.setDefaultNumberOfNamespaceBundles(1);
         pulsar3 = new PulsarService(config3);
         pulsar3.start();
         ns3 = pulsar3.getBrokerService();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services