You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/06/29 15:16:34 UTC

[pulsar] branch master updated: [Client] Consumer for a single partition of a PartitionedTopic should be set with correct partitionIndex (#4591)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5de6e6b  [Client] Consumer for a single partition of a PartitionedTopic should be set with correct partitionIndex (#4591)
5de6e6b is described below

commit 5de6e6ba9b566754f099fd1bd96809ed75ac1543
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Sat Jun 29 23:16:28 2019 +0800

    [Client] Consumer for a single partition of a PartitionedTopic should be set with correct partitionIndex (#4591)
    
    This PR fixes #4586 by getting `partitionIndex` from topic name.
---
 .../apache/pulsar/client/impl/RawReaderImpl.java   |  1 +
 .../api/PartitionedProducerConsumerTest.java       | 72 +++++++++++++++++++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 18 +++---
 .../client/impl/MultiTopicsConsumerImpl.java       |  6 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |  3 +-
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |  2 +-
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |  8 +--
 .../pulsar/client/impl/ConsumerImplTest.java       |  2 +-
 8 files changed, 91 insertions(+), 21 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index d1619f5..e6e4c19 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -108,6 +108,7 @@ public class RawReaderImpl implements RawReader {
                 conf,
                 client.externalExecutorProvider().getExecutor(),
                 TopicName.getPartitionIndex(conf.getSingleTopic()),
+                false,
                 consumerFuture,
                 SubscriptionMode.Durable,
                 MessageId.earliest,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index 0a88963..4ab650c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -35,9 +35,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
-import org.apache.pulsar.client.impl.PartitionedProducerImpl;
-import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.impl.*;
 import org.apache.pulsar.common.naming.TopicName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -670,6 +668,74 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
                 Collections.singletonList(nonPartitionedTopic));
     }
 
+    @Test
+    public void testMessageIdForSubscribeToSinglePartition() throws Exception {
+        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        TopicName topicName = null;
+        TopicName partition2TopicName = null;
+        Producer<byte[]> producer = null;
+        Consumer<byte[]> consumer1 = null;
+        Consumer<byte[]> consumer2 = null;
+        final int numPartitions = 4;
+        final int totalMessages = 30;
+
+        try {
+            log.info("-- Starting {} test --", methodName);
+
+            topicName = TopicName.get("persistent://my-property/my-ns/my-topic-" + System.currentTimeMillis());
+            partition2TopicName = topicName.getPartition(2);
+
+            admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
+
+            producer = pulsarClient.newProducer().topic(topicName.toString())
+                .messageRouter(new AlwaysTwoMessageRouter())
+                .create();
+
+            consumer1 = pulsarClient.newConsumer().topic(topicName.toString())
+                .subscriptionName("subscriber-partitioned")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscribe();
+
+            consumer2 = pulsarClient.newConsumer().topic(partition2TopicName.toString())
+                .subscriptionName("subscriber-single")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscribe();
+
+            for (int i = 0; i < totalMessages; i++) {
+                String message = "my-message-" + i;
+                producer.newMessage().key(String.valueOf(i)).value(message.getBytes()).send();
+            }
+            producer.flush();
+
+            Message<byte[]> msg;
+
+            for (int i = 0; i < totalMessages; i ++) {
+                msg = consumer1.receive(5, TimeUnit.SECONDS);
+                Assert.assertEquals(((MessageIdImpl)((TopicMessageIdImpl)msg.getMessageId()).getInnerMessageId()).getPartitionIndex(), 2);
+                consumer1.acknowledge(msg);
+            }
+
+            for (int i = 0; i < totalMessages; i ++) {
+                msg = consumer2.receive(5, TimeUnit.SECONDS);
+                Assert.assertEquals(((MessageIdImpl)msg.getMessageId()).getPartitionIndex(), 2);
+                consumer2.acknowledge(msg);
+            }
+
+        } finally {
+            producer.close();
+            consumer1.unsubscribe();
+            consumer1.close();
+            consumer2.unsubscribe();
+            consumer2.close();
+            pulsarClient.close();
+            admin.topics().deletePartitionedTopic(topicName.toString());
+
+            log.info("-- Exiting {} test --", methodName);
+        }
+    }
+
 
     /**
      * It verifies that consumer producer auto update for partitions extend.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8704fd3..11bb520 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -103,6 +103,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     private long subscribeTimeout;
     private final int partitionIndex;
+    private final boolean hasParentConsumer;
 
     private final int receiverQueueRefillThreshold;
 
@@ -153,27 +154,27 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     }
 
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
-                 ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
+                 ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
                  SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
-        return ConsumerImpl.newConsumerImpl(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode,
+        return ConsumerImpl.newConsumerImpl(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, subscriptionMode,
                                             startMessageId, schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
     }
 
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
-            ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
+            ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
             SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
             long backoffIntervalNanos, long maxBackoffIntervalNanos) {
     	if (conf.getReceiverQueueSize() == 0) {
-            return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture,
+            return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture,
                     subscriptionMode, startMessageId, schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
         } else {
-            return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture,
+            return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture,
                     subscriptionMode, startMessageId, schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
         }
     }
 
     protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
-                 ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
+                 ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
                  SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
                  long backoffIntervalNanos, long maxBackoffIntervalNanos) {
         super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema, interceptors);
@@ -183,6 +184,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         AVAILABLE_PERMITS_UPDATER.set(this, 0);
         this.subscribeTimeout = System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs();
         this.partitionIndex = partitionIndex;
+        this.hasParentConsumer = hasParentConsumer;
         this.receiverQueueRefillThreshold = conf.getReceiverQueueSize() / 2;
         this.priorityLevel = conf.getPriorityLevel();
         this.readCompacted = conf.isReadCompacted();
@@ -548,7 +550,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             // command to receive messages.
             // For readers too (isDurable==false), the partition idx will be set though we have to
             // send available permits immediately after establishing the reader session
-            if (!(firstTimeConnect && partitionIndex > -1 && isDurable) && conf.getReceiverQueueSize() != 0) {
+            if (!(firstTimeConnect && hasParentConsumer && isDurable) && conf.getReceiverQueueSize() != 0) {
                 sendFlowPermitsToBroker(cnx, conf.getReceiverQueueSize());
             }
         }).exceptionally((e) -> {
@@ -1050,7 +1052,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     // do not add each item in batch message into tracker
                     id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex());
                 }
-                if (partitionIndex != -1) {
+                if (hasParentConsumer) {
                     // we should no longer track this message, TopicsConsumer will take care from now onwards
                     unAckedMessageTracker.remove(id);
                 } else {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 5d45ca0..e50e8bc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -760,7 +760,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                         CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
                         ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName,
                                 configurationData, client.externalExecutorProvider().getExecutor(),
-                                partitionIndex, subFuture,
+                                partitionIndex, true, subFuture,
                                 SubscriptionMode.Durable, null, schema, interceptors,
                                 client.getConfiguration().getDefaultBackoffIntervalNanos(),
                                 client.getConfiguration().getMaxBackoffIntervalNanos());
@@ -774,7 +774,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
             CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
             ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
-                    client.externalExecutorProvider().getExecutor(), 0, subFuture, SubscriptionMode.Durable, null,
+                    client.externalExecutorProvider().getExecutor(), -1, true, subFuture, SubscriptionMode.Durable, null,
                     schema, interceptors, client.getConfiguration().getDefaultBackoffIntervalNanos(),
                     client.getConfiguration().getMaxBackoffIntervalNanos());
             consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
@@ -992,7 +992,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                         ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
                             client, partitionName, configurationData,
                             client.externalExecutorProvider().getExecutor(),
-                            partitionIndex, subFuture, SubscriptionMode.Durable, null, schema, interceptors,
+                            partitionIndex, true, subFuture, SubscriptionMode.Durable, null, schema, interceptors,
                             client.getConfiguration().getDefaultBackoffIntervalNanos(),
                             client.getConfiguration().getMaxBackoffIntervalNanos());
                         consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index d345da5..bceaf2c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -353,7 +353,8 @@ public class PulsarClientImpl implements PulsarClient {
                 consumer = MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
                     listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors);
             } else {
-                consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, -1,
+                int partitionIndex = TopicName.getPartitionIndex(topic);
+                consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, partitionIndex, false,
                         consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors,
                         this.conf.getDefaultBackoffIntervalNanos(), this.conf.getMaxBackoffIntervalNanos());
             }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 71953b9..df71360 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -83,7 +83,7 @@ public class ReaderImpl<T> implements Reader<T> {
 
         final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName());
         consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
-                partitionIdx, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null,
+                partitionIdx, false, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null,
                 client.getConfiguration().getDefaultBackoffIntervalNanos(), client.getConfiguration().getMaxBackoffIntervalNanos());
         
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 6beb0eb..3f3f5bd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -47,18 +47,18 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
     private volatile boolean waitingOnReceiveForZeroQueueSize = false;
 
     public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
-            ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
+            ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
             SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
             ConsumerInterceptors<T> interceptors) {
-    	this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode, startMessageId,
+    	this(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, subscriptionMode, startMessageId,
     		 schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
     }
 
     public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
-            ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
+            ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
             SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
             ConsumerInterceptors<T> interceptors, long backoffIntervalNanos, long maxBackoffIntervalNanos) {
-        super(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode, startMessageId,
+        super(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, subscriptionMode, startMessageId,
               schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
     }
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 4e6bad6..f463259 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -64,7 +64,7 @@ public class ConsumerImplTest {
 
         consumerConf.setSubscriptionName("test-sub");
         consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
-                executorService, -1, subscribeFuture, SubscriptionMode.Durable, null, null, null,
+                executorService, -1, false, subscribeFuture, SubscriptionMode.Durable, null, null, null,
                 clientConf.getDefaultBackoffIntervalNanos(), clientConf.getMaxBackoffIntervalNanos());
     }