You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/04/02 15:55:05 UTC
[pulsar] branch branch-2.3 updated: Fixed reader reading from a
partition (#3960)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new 5fe9ca4 Fixed reader reading from a partition (#3960)
5fe9ca4 is described below
commit 5fe9ca469827b550189f2931fbc1ab254a43efe6
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Apr 2 08:54:05 2019 -0700
Fixed reader reading from a partition (#3960)
---
.../org/apache/pulsar/client/impl/ReaderTest.java | 21 +++++++++++++++++++++
.../org/apache/pulsar/client/impl/ConsumerImpl.java | 6 ++++--
2 files changed, 25 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 41ed9cf..94bce85 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -112,4 +112,25 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
}
Assert.assertTrue(keys.isEmpty());
}
+
+
+ @Test
+ public void testReadFromPartition() throws Exception {
+ String topic = "persistent://my-property/my-ns/testReadFromPartition";
+ String partition0 = topic + "-partition-0";
+ admin.topics().createPartitionedTopic(topic, 4);
+ int numKeys = 10;
+
+ Set<String> keys = publishMessages(partition0, numKeys, false);
+ Reader<byte[]> reader = pulsarClient.newReader()
+ .topic(partition0)
+ .startMessageId(MessageId.earliest)
+ .create();
+
+ while (reader.hasMessageAvailable()) {
+ Message<byte[]> message = reader.readNext();
+ Assert.assertTrue(keys.remove(message.getKey()));
+ }
+ Assert.assertTrue(keys.isEmpty());
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 12e1ebe..266f483 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -591,8 +591,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
boolean firstTimeConnect = subscribeFuture.complete(this);
// if the consumer is not partitioned or is re-connected and is partitioned, we send the flow
- // command to receive messages
- if (!(firstTimeConnect && partitionIndex > -1) && conf.getReceiverQueueSize() != 0) {
+ // command to receive messages.
+ // For readers too (isDurable==false), the partition idx will be set though we have to
+ // send available permits immediately after establishing the reader session
+ if (!(firstTimeConnect && partitionIndex > -1 && isDurable) && conf.getReceiverQueueSize() != 0) {
sendFlowPermitsToBroker(cnx, conf.getReceiverQueueSize());
}
}).exceptionally((e) -> {