You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/12/01 02:55:26 UTC
[pulsar] branch master updated: Fix reader always has message
available (#8735)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4d8974d Fix reader always has message available (#8735)
4d8974d is described below
commit 4d8974d8581af53b386dd1d58edca42250e377fe
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Tue Dec 1 10:55:06 2020 +0800
Fix reader always has message available (#8735)
Fixes #8721
### Motivation
Whenever you create a new Reader with startMessageId set to latest and startMessageIdInclusive, hasMessageAvailable will be always true even if the topic is freshly created without any messages inside. If you remove message inclusiveness, then the issue is not appearing.
### Modifications
If the returned entryId is illegal, it means there is no message
### Verifying this change
ReaderTest#testReaderHasMessageAvailable
---
.../test/java/org/apache/pulsar/client/impl/ReaderTest.java | 12 ++++++++++++
.../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 9 +++++++--
2 files changed, 19 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index ab00a84..8400102 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -303,6 +303,18 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
}
@Test
+ public void testReaderHasMessageAvailable() throws Exception {
+ final String topic = "persistent://my-property/my-ns/testReaderHasMessageAvailable" + System.currentTimeMillis();
+ @Cleanup
+ Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+ .topic(topic)
+ .startMessageId(MessageId.latest)
+ .startMessageIdInclusive()
+ .create();
+ assertFalse(reader.hasMessageAvailable());
+ }
+
+ @Test
public void testKeyHashRangeReader() throws IOException {
final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
final String topic = "persistent://my-property/my-ns/testKeyHashRangeReader";
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c87bdc7..1f2613c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2010,11 +2010,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
startMessageId.partitionIndex == -1) {
getLastMessageIdAsync()
- .thenCompose(this::seekAsync)
- .whenComplete((ignore, e) -> {
+ .thenCompose((msgId) -> seekAsync(msgId).thenApply((ignore) -> msgId))
+ .whenComplete((msgId, e) -> {
if (e != null) {
log.error("[{}][{}] Failed getLastMessageId command", topic, subscription);
booleanFuture.completeExceptionally(e.getCause());
+ return;
+ }
+ MessageIdImpl messageId = MessageIdImpl.convertToMessageIdImpl(msgId);
+ if (messageId == null || messageId.getEntryId() < 0) {
+ booleanFuture.complete(false);
} else {
booleanFuture.complete(resetIncludeHead);
}