You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/19 00:49:30 UTC
[pulsar] 15/26: [client] Set actual topic name to partitioned
consumer (#4064)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 487eaff759dd687c91f821907e83974ea4c3b7aa
Author: massakam <ma...@yahoo-corp.jp>
AuthorDate: Thu Apr 18 07:09:50 2019 +0900
[client] Set actual topic name to partitioned consumer (#4064)
* Set actual topic name to partitioned consumer
* Change dummy topic name prefix
---
.../client/api/PartitionedProducerConsumerTest.java | 1 +
.../client/impl/PatternTopicsConsumerImplTest.java | 1 +
.../pulsar/client/impl/TopicsConsumerImplTest.java | 1 +
.../pulsar/client/impl/MultiTopicsConsumerImpl.java | 21 ++++++++++++++++-----
4 files changed, 19 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index 55952b7..0a88963 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -87,6 +87,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName.toString())
.subscriptionName("my-partitioned-subscriber").subscribe();
+ assertEquals(consumer.getTopic(), topicName.toString());
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 9a09ef7..4228fdb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -172,6 +172,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();
+ assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index fc46586..69f1c74 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -121,6 +121,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
.receiverQueueSize(4)
.subscribe();
assertTrue(consumer instanceof MultiTopicsConsumerImpl);
+ assertTrue(consumer.getTopic().startsWith(MultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics();
List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 39c0511..df20e2a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -63,6 +63,8 @@ import org.slf4j.LoggerFactory;
public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
+ public static final String DUMMY_TOPIC_NAME_PREFIX = "MultiTopicsConsumer-";
+
// All topics should be in same namespace
protected NamespaceName namespaceName;
@@ -93,10 +95,18 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
private final UnAckedMessageTracker unAckedMessageTracker;
private final ConsumerConfigurationData<T> internalConfig;
- MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor,
- CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
- super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), conf,
- Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture, schema, interceptors);
+ MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf,
+ ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
+ ConsumerInterceptors<T> interceptors) {
+ this(client, DUMMY_TOPIC_NAME_PREFIX + ConsumerName.generateRandomName(), conf, listenerExecutor,
+ subscribeFuture, schema, interceptors);
+ }
+
+ MultiTopicsConsumerImpl(PulsarClientImpl client, String singleTopic, ConsumerConfigurationData<T> conf,
+ ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
+ ConsumerInterceptors<T> interceptors) {
+ super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture,
+ schema, interceptors);
checkArgument(conf.getReceiverQueueSize() > 0,
"Receiver queue size needs to be greater than 0 for Topics Consumer");
@@ -663,7 +673,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
cloneConf.getTopicNames().remove(topicName);
CompletableFuture<Consumer> future = new CompletableFuture<>();
- MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, cloneConf, listenerExecutor, future, schema, interceptors);
+ MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, topicName, cloneConf, listenerExecutor,
+ future, schema, interceptors);
future.thenCompose(c -> ((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions))
.thenRun(()-> subscribeFuture.complete(consumer))