You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2024/03/26 06:53:34 UTC

(pulsar) branch branch-2.10 updated: [fix] [client] Unclear error message when creating a consumer with two same topics (#22255)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new dc5a73b5624 [fix] [client] Unclear error message when creating a consumer with two same topics (#22255)
dc5a73b5624 is described below

commit dc5a73b5624698ab2d46c9f40586e7fa21b5ec4a
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Tue Mar 19 17:23:13 2024 +0800

    [fix] [client] Unclear error message when creating a consumer with two same topics (#22255)
    
    (cherry picked from commit c616b35e039b59c8eb4709b1859e6377295fb6b8)
---
 .../pulsar/client/api/MultiTopicsConsumerTest.java | 28 ++++++++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       | 22 ++++++++---------
 2 files changed, 39 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index e2b47642d87..2ef1812e36b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -26,6 +26,9 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import com.google.common.collect.Lists;
+import static org.testng.Assert.fail;
+
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -35,12 +38,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.mockito.AdditionalAnswers;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
@@ -215,4 +220,27 @@ public class MultiTopicsConsumerTest extends ProducerConsumerBase {
         assertTrue(consumer instanceof MultiTopicsConsumerImpl);
         assertTrue(consumer.isConnected());
     }
+
+    @Test
+    public void testSameTopics() throws Exception {
+        final String topic1 = BrokerTestUtil.newUniqueName("public/default/tp");
+        final String topic2 = "persistent://" + topic1;
+        admin.topics().createNonPartitionedTopic(topic2);
+        // Create consumer with two same topics.
+        try {
+            pulsarClient.newConsumer(Schema.INT32).topics(Arrays.asList(topic1, topic2))
+                    .subscriptionName("s1").subscribe();
+            fail("Do not allow use two same topics.");
+        } catch (Exception e) {
+            if (e instanceof PulsarClientException && e.getCause() != null) {
+                e = (Exception) e.getCause();
+            }
+            Throwable unwrapEx = FutureUtil.unwrapCompletionException(e);
+            assertTrue(unwrapEx instanceof IllegalArgumentException);
+            assertTrue(e.getMessage().contains( "Subscription topics include duplicate items"
+                    + " or invalid names"));
+        }
+        // cleanup.
+        admin.topics().delete(topic2);
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 163c2c0da11..37dfe380295 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -177,8 +177,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             return;
         }
 
-        checkArgument(conf.getTopicNames().isEmpty()
-                || topicNamesValid(conf.getTopicNames()), "Topics is empty or invalid.");
+        checkArgument(topicNamesValid(conf.getTopicNames()), "Subscription topics include duplicate items"
+                + " or invalid names.");
 
         List<CompletableFuture<Void>> futures = conf.getTopicNames().stream()
                 .map(t -> subscribeAsync(t, createTopicIfDoesNotExist))
@@ -215,21 +215,21 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         checkState(topics != null && topics.size() >= 1,
             "topics should contain more than 1 topic");
 
-        Optional<String> result = topics.stream()
-                .filter(topic -> !TopicName.isValid(topic))
-                .findFirst();
+        Set<TopicName> topicNames = new HashSet<>();
 
-        if (result.isPresent()) {
-            log.warn("Received invalid topic name: {}", result.get());
-            return false;
+        for (String topic : topics) {
+            if (!TopicName.isValid(topic)) {
+                log.warn("Received invalid topic name: {}", topic);
+                return false;
+            }
+            topicNames.add(TopicName.get(topic));
         }
 
         // check topic names are unique
-        HashSet<String> set = new HashSet<>(topics);
-        if (set.size() == topics.size()) {
+        if (topicNames.size() == topics.size()) {
             return true;
         } else {
-            log.warn("Topic names not unique. unique/all : {}/{}", set.size(), topics.size());
+            log.warn("Topic names not unique. unique/all : {}/{}", topicNames.size(), topics.size());
             return false;
         }
     }