You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/23 06:42:09 UTC

[pulsar] branch master updated: [fix][client] Fix subscription topic name error. (#16719)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 15a00137012 [fix][client] Fix subscription topic name error. (#16719)
15a00137012 is described below

commit 15a00137012669b0f6fd869ca5dbdf50898242ac
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Sat Jul 23 14:41:58 2022 +0800

    [fix][client] Fix subscription topic name error. (#16719)
---
 .../client/impl/PatternTopicsConsumerImplTest.java | 44 +++++++++++++++++-----
 .../impl/PatternMultiTopicsConsumerImpl.java       | 11 +++++-
 2 files changed, 43 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 9b3ff40113d..38f2ca366bb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -469,11 +469,6 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
             .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
-        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
-        NamespaceService nss = pulsar.getNamespaceService();
-        doReturn(CompletableFuture.completedFuture(topicNames)).when(nss)
-                .getListOfPersistentTopics(NamespaceName.get("my-property/my-ns"));
-
         // 5. call recheckTopics to subscribe each added topics above
         log.debug("recheck topics change");
         PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
@@ -514,6 +509,40 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
         producer3.close();
     }
 
+    @Test(timeOut = testTimeout)
+    public void testAutoSubscribePatterConsumerFromBrokerWatcher() throws Exception {
+        String key = "AutoSubscribePatternConsumer";
+        String subscriptionName = "my-ex-subscription-" + key;
+
+        Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topicsPattern(pattern)
+                // Disable automatic discovery.
+                .patternAutoDiscoveryPeriod(1000)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+                .receiverQueueSize(4)
+                .subscribe();
+
+        // 1. create partition
+        String topicName = "persistent://my-property/my-ns/pattern-topic-1-" + key;
+        TenantInfoImpl tenantInfo = createDefaultTenantInfo();
+        admin.tenants().createTenant("prop", tenantInfo);
+        admin.topics().createPartitionedTopic(topicName, 4);
+
+        // 2. verify consumer get methods. There is no need to trigger discovery, because the broker will push the
+        // changes to update(CommandWatchTopicUpdate).
+        assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 4);
+            assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 4);
+            assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 1);
+        });
+
+        consumer.close();
+    }
+
     // simulate subscribe a pattern which has 3 topics, but then matched topic added in.
     @Test(timeOut = testTimeout)
     public void testAutoSubscribePatternConsumer() throws Exception {
@@ -590,11 +619,6 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
             .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
-        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4);
-        NamespaceService nss = pulsar.getNamespaceService();
-        doReturn(CompletableFuture.completedFuture(topicNames)).when(nss)
-                .getListOfPersistentTopics(NamespaceName.get("my-property/my-ns"));
-
         // 7. call recheckTopics to subscribe each added topics above, verify topics number: 10=1+2+3+4
         log.debug("recheck topics change");
         PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index 79a7c6a3ae6..4433bba15d5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -28,9 +28,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -203,9 +205,14 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
                 return addFuture;
             }
 
+            Set<String> addTopicPartitionedName = addedTopics.stream()
+                    .map(addTopicName -> TopicName.get(addTopicName).getPartitionedTopicName())
+                    .collect(Collectors.toSet());
+
             List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(partitionedTopics.size());
-            addedTopics.stream().forEach(topic -> futures.add(
-                    subscribeAsync(topic, false /* createTopicIfDoesNotExist */)));
+            addTopicPartitionedName.forEach(partitionedTopic -> futures.add(
+                    subscribeAsync(partitionedTopic,
+                            false /* createTopicIfDoesNotExist */)));
             FutureUtil.waitForAll(futures)
                 .thenAccept(finalFuture -> addFuture.complete(null))
                 .exceptionally(ex -> {