You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/07 02:28:06 UTC

[pulsar] branch branch-2.7 updated: Fix master broker while subscribe to non-persistent partitioned topic without topic auto-creation (#9107)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 9897c7d  Fix master broker while subscribe to non-persistent partitioned topic without topic auto-creation (#9107)
9897c7d is described below

commit 9897c7db5b8788e25710fd3dc7a2b693d4bc9b41
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Jan 3 10:48:55 2021 +0800

    Fix master broker while subscribe to non-persistent partitioned topic without topic auto-creation (#9107)
    
    ### Motivation
    
    After #9029 merged, the non-persistent topic can be created when enabling the topic auto-creation, Otherwise, the client will get a `Topic does not exist` exception. This looks like a concurrent merge related issue, but I'm not able to find another PR related to this issue.
    
    This PR is fixing the test that wants to subscribe to a partitioned non-persistent topic but disabled the topic auto-creation. Currently, the fix is enabling the topic auto-creation for the test. For non-persistent topics, we don't persist any metadata for it in the metadata server, so for users who want to use the non-persistent topic, they must enable the topic auto-creation.
    
    (cherry picked from commit 21a3a3b5376fee8afeb01c5decfbdee49ae26a0c)
---
 .../pulsar/broker/service/BrokerService.java       | 27 ++++++++++++----------
 .../pulsar/client/api/PartitionCreationTest.java   | 14 +++++------
 2 files changed, 22 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 32bf07e..6330e36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1558,19 +1558,22 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             synchronized (multiLayerTopicsMap) {
                 ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> namespaceMap = multiLayerTopicsMap
                         .get(namespaceName);
-                ConcurrentOpenHashMap<String, Topic> bundleMap = namespaceMap.get(bundleName);
-                bundleMap.remove(topic);
-                if (bundleMap.isEmpty()) {
-                    namespaceMap.remove(bundleName);
-                }
+                if (namespaceMap != null) {
+                    ConcurrentOpenHashMap<String, Topic> bundleMap = namespaceMap.get(bundleName);
+                    bundleMap.remove(topic);
+                    if (bundleMap.isEmpty()) {
+                        namespaceMap.remove(bundleName);
+                    }
 
-                if (namespaceMap.isEmpty()) {
-                    multiLayerTopicsMap.remove(namespaceName);
-                    final ClusterReplicationMetrics clusterReplicationMetrics = pulsarStats
-                            .getClusterReplicationMetrics();
-                    replicationClients.forEach((cluster, client) -> {
-                        clusterReplicationMetrics.remove(clusterReplicationMetrics.getKeyName(namespaceName, cluster));
-                    });
+                    if (namespaceMap.isEmpty()) {
+                        multiLayerTopicsMap.remove(namespaceName);
+                        final ClusterReplicationMetrics clusterReplicationMetrics = pulsarStats
+                                .getClusterReplicationMetrics();
+                        replicationClients.forEach((cluster, client) -> {
+                            clusterReplicationMetrics.remove(clusterReplicationMetrics.getKeyName(namespaceName,
+                                    cluster));
+                        });
+                    }
                 }
             }
         } catch (Exception e) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
index a217d05..dfdb25f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
@@ -63,15 +63,15 @@ public class PartitionCreationTest extends ProducerConsumerBase {
         super.internalCleanup();
     }
 
-    @Test(dataProvider = "topicDomainProvider")
+    @Test(dataProvider = "topicDomainProvider", timeOut = 60000)
     public void testCreateConsumerForPartitionedTopicWhenDisableTopicAutoCreation(TopicDomain domain) throws PulsarAdminException, PulsarClientException {
-        conf.setAllowAutoTopicCreation(false);
+        conf.setAllowAutoTopicCreation(domain.equals(TopicDomain.non_persistent));
         final String topic = domain.value() + "://public/default/testCreateConsumerWhenDisableTopicAutoCreation";
         admin.topics().createPartitionedTopic(topic, 3);
         Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe());
     }
 
-    @Test(dataProvider = "topicDomainProvider")
+    @Test(dataProvider = "topicDomainProvider", timeOut = 60000)
     public void testCreateConsumerForNonPartitionedTopicWhenDisableTopicAutoCreation(TopicDomain domain) throws PulsarClientException {
         conf.setAllowAutoTopicCreation(false);
         final String topic = domain.value() + "://public/default/testCreateConsumerForNonPartitionedTopicWhenDisableTopicAutoCreation";
@@ -88,7 +88,7 @@ public class PartitionCreationTest extends ProducerConsumerBase {
         }
     }
 
-    @Test(dataProvider = "topicDomainProvider")
+    @Test(dataProvider = "topicDomainProvider", timeOut = 60000)
     public void testCreateConsumerForPartitionedTopicWhenEnableTopicAutoCreation(TopicDomain domain) throws PulsarAdminException, PulsarClientException {
         conf.setAllowAutoTopicCreation(true);
         final String topic = domain.value() + "://public/default/testCreateConsumerForPartitionedTopicWhenEnableTopicAutoCreation";
@@ -96,14 +96,14 @@ public class PartitionCreationTest extends ProducerConsumerBase {
         Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe());
     }
 
-    @Test(dataProvider = "topicDomainProvider")
+    @Test(dataProvider = "topicDomainProvider", timeOut = 60000)
     public void testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation(TopicDomain domain) throws PulsarClientException {
         conf.setAllowAutoTopicCreation(true);
         final String topic = domain.value() + "://public/default/testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation";
         Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe());
     }
 
-    @Test
+    @Test(timeOut = 60000)
     public void testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation() throws Exception {
         conf.setAllowAutoTopicCreation(false);
         final String topic = "testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation-" + System.currentTimeMillis();
@@ -118,7 +118,7 @@ public class PartitionCreationTest extends ProducerConsumerBase {
         Assert.assertEquals(consumer.getConsumers().size(), 5);
     }
 
-    @Test
+    @Test(timeOut = 60000)
     public void testCreateMissedPartitions() throws JsonProcessingException, KeeperException, InterruptedException, PulsarAdminException, PulsarClientException {
         conf.setAllowAutoTopicCreation(false);
         final String topic = "testCreateMissedPartitions";