You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/04/02 15:54:12 UTC

[pulsar] branch master updated: Fixed reader reading from a partition (#3960)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b68376  Fixed reader reading from a partition (#3960)
7b68376 is described below

commit 7b683760ac3fac7e2a3075a7f2b28724b881d21f
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Apr 2 08:54:05 2019 -0700

    Fixed reader reading from a partition (#3960)
---
 .../org/apache/pulsar/client/impl/ReaderTest.java   | 21 +++++++++++++++++++++
 .../org/apache/pulsar/client/impl/ConsumerImpl.java |  6 ++++--
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 41ed9cf..94bce85 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -112,4 +112,25 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
         }
         Assert.assertTrue(keys.isEmpty());
     }
+
+
+    @Test
+    public void testReadFromPartition() throws Exception {
+        String topic = "persistent://my-property/my-ns/testReadFromPartition";
+        String partition0 = topic + "-partition-0";
+        admin.topics().createPartitionedTopic(topic, 4);
+        int numKeys = 10;
+
+        Set<String> keys = publishMessages(partition0, numKeys, false);
+        Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(partition0)
+                .startMessageId(MessageId.earliest)
+                .create();
+
+        while (reader.hasMessageAvailable()) {
+            Message<byte[]> message = reader.readNext();
+            Assert.assertTrue(keys.remove(message.getKey()));
+        }
+        Assert.assertTrue(keys.isEmpty());
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c4cdb50..fc0c660 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -529,8 +529,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
             boolean firstTimeConnect = subscribeFuture.complete(this);
             // if the consumer is not partitioned or is re-connected and is partitioned, we send the flow
-            // command to receive messages
-            if (!(firstTimeConnect && partitionIndex > -1) && conf.getReceiverQueueSize() != 0) {
+            // command to receive messages.
+            // For readers too (isDurable==false), the partition idx will be set though we have to
+            // send available permits immediately after establishing the reader session
+            if (!(firstTimeConnect && partitionIndex > -1 && isDurable) && conf.getReceiverQueueSize() != 0) {
                 sendFlowPermitsToBroker(cnx, conf.getReceiverQueueSize());
             }
         }).exceptionally((e) -> {