You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/07 02:28:06 UTC
[pulsar] branch branch-2.7 updated: Fix master broker while
subscribe to non-persistent partitioned topic without topic auto-creation
(#9107)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 9897c7d Fix master broker while subscribe to non-persistent partitioned topic without topic auto-creation (#9107)
9897c7d is described below
commit 9897c7db5b8788e25710fd3dc7a2b693d4bc9b41
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Jan 3 10:48:55 2021 +0800
Fix master broker while subscribe to non-persistent partitioned topic without topic auto-creation (#9107)
### Motivation
After #9029 merged, the non-persistent topic can be created when enabling the topic auto-creation, Otherwise, the client will get a `Topic does not exist` exception. This looks like a concurrent merge related issue, but I'm not able to find another PR related to this issue.
This PR is fixing the test that wants to subscribe to a partitioned non-persistent topic but disabled the topic auto-creation. Currently, the fix is enabling the topic auto-creation for the test. For non-persistent topics, we don't persist any metadata for it in the metadata server, so for users who want to use the non-persistent topic, they must enable the topic auto-creation.
(cherry picked from commit 21a3a3b5376fee8afeb01c5decfbdee49ae26a0c)
---
.../pulsar/broker/service/BrokerService.java | 27 ++++++++++++----------
.../pulsar/client/api/PartitionCreationTest.java | 14 +++++------
2 files changed, 22 insertions(+), 19 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 32bf07e..6330e36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1558,19 +1558,22 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
synchronized (multiLayerTopicsMap) {
ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> namespaceMap = multiLayerTopicsMap
.get(namespaceName);
- ConcurrentOpenHashMap<String, Topic> bundleMap = namespaceMap.get(bundleName);
- bundleMap.remove(topic);
- if (bundleMap.isEmpty()) {
- namespaceMap.remove(bundleName);
- }
+ if (namespaceMap != null) {
+ ConcurrentOpenHashMap<String, Topic> bundleMap = namespaceMap.get(bundleName);
+ bundleMap.remove(topic);
+ if (bundleMap.isEmpty()) {
+ namespaceMap.remove(bundleName);
+ }
- if (namespaceMap.isEmpty()) {
- multiLayerTopicsMap.remove(namespaceName);
- final ClusterReplicationMetrics clusterReplicationMetrics = pulsarStats
- .getClusterReplicationMetrics();
- replicationClients.forEach((cluster, client) -> {
- clusterReplicationMetrics.remove(clusterReplicationMetrics.getKeyName(namespaceName, cluster));
- });
+ if (namespaceMap.isEmpty()) {
+ multiLayerTopicsMap.remove(namespaceName);
+ final ClusterReplicationMetrics clusterReplicationMetrics = pulsarStats
+ .getClusterReplicationMetrics();
+ replicationClients.forEach((cluster, client) -> {
+ clusterReplicationMetrics.remove(clusterReplicationMetrics.getKeyName(namespaceName,
+ cluster));
+ });
+ }
}
}
} catch (Exception e) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
index a217d05..dfdb25f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
@@ -63,15 +63,15 @@ public class PartitionCreationTest extends ProducerConsumerBase {
super.internalCleanup();
}
- @Test(dataProvider = "topicDomainProvider")
+ @Test(dataProvider = "topicDomainProvider", timeOut = 60000)
public void testCreateConsumerForPartitionedTopicWhenDisableTopicAutoCreation(TopicDomain domain) throws PulsarAdminException, PulsarClientException {
- conf.setAllowAutoTopicCreation(false);
+ conf.setAllowAutoTopicCreation(domain.equals(TopicDomain.non_persistent));
final String topic = domain.value() + "://public/default/testCreateConsumerWhenDisableTopicAutoCreation";
admin.topics().createPartitionedTopic(topic, 3);
Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe());
}
- @Test(dataProvider = "topicDomainProvider")
+ @Test(dataProvider = "topicDomainProvider", timeOut = 60000)
public void testCreateConsumerForNonPartitionedTopicWhenDisableTopicAutoCreation(TopicDomain domain) throws PulsarClientException {
conf.setAllowAutoTopicCreation(false);
final String topic = domain.value() + "://public/default/testCreateConsumerForNonPartitionedTopicWhenDisableTopicAutoCreation";
@@ -88,7 +88,7 @@ public class PartitionCreationTest extends ProducerConsumerBase {
}
}
- @Test(dataProvider = "topicDomainProvider")
+ @Test(dataProvider = "topicDomainProvider", timeOut = 60000)
public void testCreateConsumerForPartitionedTopicWhenEnableTopicAutoCreation(TopicDomain domain) throws PulsarAdminException, PulsarClientException {
conf.setAllowAutoTopicCreation(true);
final String topic = domain.value() + "://public/default/testCreateConsumerForPartitionedTopicWhenEnableTopicAutoCreation";
@@ -96,14 +96,14 @@ public class PartitionCreationTest extends ProducerConsumerBase {
Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe());
}
- @Test(dataProvider = "topicDomainProvider")
+ @Test(dataProvider = "topicDomainProvider", timeOut = 60000)
public void testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation(TopicDomain domain) throws PulsarClientException {
conf.setAllowAutoTopicCreation(true);
final String topic = domain.value() + "://public/default/testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation";
Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe());
}
- @Test
+ @Test(timeOut = 60000)
public void testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation() throws Exception {
conf.setAllowAutoTopicCreation(false);
final String topic = "testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation-" + System.currentTimeMillis();
@@ -118,7 +118,7 @@ public class PartitionCreationTest extends ProducerConsumerBase {
Assert.assertEquals(consumer.getConsumers().size(), 5);
}
- @Test
+ @Test(timeOut = 60000)
public void testCreateMissedPartitions() throws JsonProcessingException, KeeperException, InterruptedException, PulsarAdminException, PulsarClientException {
conf.setAllowAutoTopicCreation(false);
final String topic = "testCreateMissedPartitions";