You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/23 06:42:09 UTC
[pulsar] branch master updated: [fix][client] Fix subscription topic name error. (#16719)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 15a00137012 [fix][client] Fix subscription topic name error. (#16719)
15a00137012 is described below
commit 15a00137012669b0f6fd869ca5dbdf50898242ac
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Sat Jul 23 14:41:58 2022 +0800
[fix][client] Fix subscription topic name error. (#16719)
---
.../client/impl/PatternTopicsConsumerImplTest.java | 44 +++++++++++++++++-----
.../impl/PatternMultiTopicsConsumerImpl.java | 11 +++++-
2 files changed, 43 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 9b3ff40113d..38f2ca366bb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -469,11 +469,6 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
.create();
- List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
- NamespaceService nss = pulsar.getNamespaceService();
- doReturn(CompletableFuture.completedFuture(topicNames)).when(nss)
- .getListOfPersistentTopics(NamespaceName.get("my-property/my-ns"));
-
// 5. call recheckTopics to subscribe each added topics above
log.debug("recheck topics change");
PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
@@ -514,6 +509,40 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
producer3.close();
}
+ @Test(timeOut = testTimeout)
+ public void testAutoSubscribePatterConsumerFromBrokerWatcher() throws Exception {
+ String key = "AutoSubscribePatternConsumer";
+ String subscriptionName = "my-ex-subscription-" + key;
+
+ Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topicsPattern(pattern)
+ // Disable automatic discovery.
+ .patternAutoDiscoveryPeriod(1000)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+ .receiverQueueSize(4)
+ .subscribe();
+
+ // 1. create partition
+ String topicName = "persistent://my-property/my-ns/pattern-topic-1-" + key;
+ TenantInfoImpl tenantInfo = createDefaultTenantInfo();
+ admin.tenants().createTenant("prop", tenantInfo);
+ admin.topics().createPartitionedTopic(topicName, 4);
+
+ // 2. verify consumer get methods. There is no need to trigger discovery, because the broker will push the
+ // changes to update(CommandWatchTopicUpdate).
+ assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 4);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 4);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 1);
+ });
+
+ consumer.close();
+ }
+
// simulate subscribe a pattern which has 3 topics, but then matched topic added in.
@Test(timeOut = testTimeout)
public void testAutoSubscribePatternConsumer() throws Exception {
@@ -590,11 +619,6 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
.create();
- List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4);
- NamespaceService nss = pulsar.getNamespaceService();
- doReturn(CompletableFuture.completedFuture(topicNames)).when(nss)
- .getListOfPersistentTopics(NamespaceName.get("my-property/my-ns"));
-
// 7. call recheckTopics to subscribe each added topics above, verify topics number: 10=1+2+3+4
log.debug("recheck topics change");
PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index 79a7c6a3ae6..4433bba15d5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -28,9 +28,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -203,9 +205,14 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
return addFuture;
}
+ Set<String> addTopicPartitionedName = addedTopics.stream()
+ .map(addTopicName -> TopicName.get(addTopicName).getPartitionedTopicName())
+ .collect(Collectors.toSet());
+
List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(partitionedTopics.size());
- addedTopics.stream().forEach(topic -> futures.add(
- subscribeAsync(topic, false /* createTopicIfDoesNotExist */)));
+ addTopicPartitionedName.forEach(partitionedTopic -> futures.add(
+ subscribeAsync(partitionedTopic,
+ false /* createTopicIfDoesNotExist */)));
FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> addFuture.complete(null))
.exceptionally(ex -> {