You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/19 00:49:30 UTC

[pulsar] 15/26: [client] Set actual topic name to partitioned consumer (#4064)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 487eaff759dd687c91f821907e83974ea4c3b7aa
Author: massakam <ma...@yahoo-corp.jp>
AuthorDate: Thu Apr 18 07:09:50 2019 +0900

    [client] Set actual topic name to partitioned consumer (#4064)
    
    * Set actual topic name to partitioned consumer
    
    * Change dummy topic name prefix
---
 .../client/api/PartitionedProducerConsumerTest.java |  1 +
 .../client/impl/PatternTopicsConsumerImplTest.java  |  1 +
 .../pulsar/client/impl/TopicsConsumerImplTest.java  |  1 +
 .../pulsar/client/impl/MultiTopicsConsumerImpl.java | 21 ++++++++++++++++-----
 4 files changed, 19 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index 55952b7..0a88963 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -87,6 +87,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName.toString())
                 .subscriptionName("my-partitioned-subscriber").subscribe();
+        assertEquals(consumer.getTopic(), topicName.toString());
 
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 9a09ef7..4228fdb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -172,6 +172,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
             .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
             .receiverQueueSize(4)
             .subscribe();
+        assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
 
         // 4. verify consumer get methods, to get right number of partitions and topics.
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index fc46586..69f1c74 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -121,6 +121,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
             .receiverQueueSize(4)
             .subscribe();
         assertTrue(consumer instanceof MultiTopicsConsumerImpl);
+        assertTrue(consumer.getTopic().startsWith(MultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
 
         List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics();
         List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 39c0511..df20e2a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -63,6 +63,8 @@ import org.slf4j.LoggerFactory;
 
 public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
+    public static final String DUMMY_TOPIC_NAME_PREFIX = "MultiTopicsConsumer-";
+
     // All topics should be in same namespace
     protected NamespaceName namespaceName;
 
@@ -93,10 +95,18 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     private final UnAckedMessageTracker unAckedMessageTracker;
     private final ConsumerConfigurationData<T> internalConfig;
 
-    MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor,
-                            CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
-        super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), conf,
-                Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture, schema, interceptors);
+    MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf,
+            ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
+            ConsumerInterceptors<T> interceptors) {
+        this(client, DUMMY_TOPIC_NAME_PREFIX + ConsumerName.generateRandomName(), conf, listenerExecutor,
+                subscribeFuture, schema, interceptors);
+    }
+
+    MultiTopicsConsumerImpl(PulsarClientImpl client, String singleTopic, ConsumerConfigurationData<T> conf,
+            ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
+            ConsumerInterceptors<T> interceptors) {
+        super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture,
+                schema, interceptors);
 
         checkArgument(conf.getReceiverQueueSize() > 0,
             "Receiver queue size needs to be greater than 0 for Topics Consumer");
@@ -663,7 +673,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         cloneConf.getTopicNames().remove(topicName);
 
         CompletableFuture<Consumer> future = new CompletableFuture<>();
-        MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, cloneConf, listenerExecutor, future, schema, interceptors);
+        MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, topicName, cloneConf, listenerExecutor,
+                future, schema, interceptors);
 
         future.thenCompose(c -> ((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions))
             .thenRun(()-> subscribeFuture.complete(consumer))