You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2024/03/26 06:53:34 UTC
(pulsar) branch branch-2.10 updated: [fix] [client] Unclear error message when creating a consumer with two same topics (#22255)
This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new dc5a73b5624 [fix] [client] Unclear error message when creating a consumer with two same topics (#22255)
dc5a73b5624 is described below
commit dc5a73b5624698ab2d46c9f40586e7fa21b5ec4a
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Tue Mar 19 17:23:13 2024 +0800
[fix] [client] Unclear error message when creating a consumer with two same topics (#22255)
(cherry picked from commit c616b35e039b59c8eb4709b1859e6377295fb6b8)
---
.../pulsar/client/api/MultiTopicsConsumerTest.java | 28 ++++++++++++++++++++++
.../client/impl/MultiTopicsConsumerImpl.java | 22 ++++++++---------
2 files changed, 39 insertions(+), 11 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index e2b47642d87..2ef1812e36b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -26,6 +26,9 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.common.collect.Lists;
+import static org.testng.Assert.fail;
+
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -35,12 +38,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import org.slf4j.Logger;
@@ -215,4 +220,27 @@ public class MultiTopicsConsumerTest extends ProducerConsumerBase {
assertTrue(consumer instanceof MultiTopicsConsumerImpl);
assertTrue(consumer.isConnected());
}
+
+ @Test
+ public void testSameTopics() throws Exception {
+ final String topic1 = BrokerTestUtil.newUniqueName("public/default/tp");
+ final String topic2 = "persistent://" + topic1;
+ admin.topics().createNonPartitionedTopic(topic2);
+ // Create consumer with two same topics.
+ try {
+ pulsarClient.newConsumer(Schema.INT32).topics(Arrays.asList(topic1, topic2))
+ .subscriptionName("s1").subscribe();
+ fail("Do not allow use two same topics.");
+ } catch (Exception e) {
+ if (e instanceof PulsarClientException && e.getCause() != null) {
+ e = (Exception) e.getCause();
+ }
+ Throwable unwrapEx = FutureUtil.unwrapCompletionException(e);
+ assertTrue(unwrapEx instanceof IllegalArgumentException);
+ assertTrue(e.getMessage().contains( "Subscription topics include duplicate items"
+ + " or invalid names"));
+ }
+ // cleanup.
+ admin.topics().delete(topic2);
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 163c2c0da11..37dfe380295 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -177,8 +177,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
return;
}
- checkArgument(conf.getTopicNames().isEmpty()
- || topicNamesValid(conf.getTopicNames()), "Topics is empty or invalid.");
+ checkArgument(topicNamesValid(conf.getTopicNames()), "Subscription topics include duplicate items"
+ + " or invalid names.");
List<CompletableFuture<Void>> futures = conf.getTopicNames().stream()
.map(t -> subscribeAsync(t, createTopicIfDoesNotExist))
@@ -215,21 +215,21 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
checkState(topics != null && topics.size() >= 1,
"topics should contain more than 1 topic");
- Optional<String> result = topics.stream()
- .filter(topic -> !TopicName.isValid(topic))
- .findFirst();
+ Set<TopicName> topicNames = new HashSet<>();
- if (result.isPresent()) {
- log.warn("Received invalid topic name: {}", result.get());
- return false;
+ for (String topic : topics) {
+ if (!TopicName.isValid(topic)) {
+ log.warn("Received invalid topic name: {}", topic);
+ return false;
+ }
+ topicNames.add(TopicName.get(topic));
}
// check topic names are unique
- HashSet<String> set = new HashSet<>(topics);
- if (set.size() == topics.size()) {
+ if (topicNames.size() == topics.size()) {
return true;
} else {
- log.warn("Topic names not unique. unique/all : {}/{}", set.size(), topics.size());
+ log.warn("Topic names not unique. unique/all : {}/{}", topicNames.size(), topics.size());
return false;
}
}