You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/04/02 15:55:05 UTC

[pulsar] branch branch-2.3 updated: Fixed reader reading from a partition (#3960)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new 5fe9ca4  Fixed reader reading from a partition (#3960)
5fe9ca4 is described below

commit 5fe9ca469827b550189f2931fbc1ab254a43efe6
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Apr 2 08:54:05 2019 -0700

    Fixed reader reading from a partition (#3960)
---
 .../org/apache/pulsar/client/impl/ReaderTest.java   | 21 +++++++++++++++++++++
 .../org/apache/pulsar/client/impl/ConsumerImpl.java |  6 ++++--
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 41ed9cf..94bce85 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -112,4 +112,25 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
         }
         Assert.assertTrue(keys.isEmpty());
     }
+
+
+    @Test
+    public void testReadFromPartition() throws Exception {
+        String topic = "persistent://my-property/my-ns/testReadFromPartition";
+        String partition0 = topic + "-partition-0";
+        admin.topics().createPartitionedTopic(topic, 4);
+        int numKeys = 10;
+
+        Set<String> keys = publishMessages(partition0, numKeys, false);
+        Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(partition0)
+                .startMessageId(MessageId.earliest)
+                .create();
+
+        while (reader.hasMessageAvailable()) {
+            Message<byte[]> message = reader.readNext();
+            Assert.assertTrue(keys.remove(message.getKey()));
+        }
+        Assert.assertTrue(keys.isEmpty());
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 12e1ebe..266f483 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -591,8 +591,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
             boolean firstTimeConnect = subscribeFuture.complete(this);
             // if the consumer is not partitioned or is re-connected and is partitioned, we send the flow
-            // command to receive messages
-            if (!(firstTimeConnect && partitionIndex > -1) && conf.getReceiverQueueSize() != 0) {
+            // command to receive messages.
+            // For readers too (isDurable==false), the partition idx will be set though we have to
+            // send available permits immediately after establishing the reader session
+            if (!(firstTimeConnect && partitionIndex > -1 && isDurable) && conf.getReceiverQueueSize() != 0) {
                 sendFlowPermitsToBroker(cnx, conf.getReceiverQueueSize());
             }
         }).exceptionally((e) -> {