You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/12/01 02:55:26 UTC

[pulsar] branch master updated: Fix reader always has message available (#8735)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d8974d  Fix reader always has message available (#8735)
4d8974d is described below

commit 4d8974d8581af53b386dd1d58edca42250e377fe
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Tue Dec 1 10:55:06 2020 +0800

    Fix reader always has message available (#8735)
    
    Fixes #8721
    
    ### Motivation
    Whenever you create a new Reader with startMessageId set to latest and startMessageIdInclusive, hasMessageAvailable will be always true even if the topic is freshly created without any messages inside. If you remove message inclusiveness, then the issue is not appearing.
    
    ### Modifications
    If the returned entryId is illegal, it means there is no message
    
    ### Verifying this change
    ReaderTest#testReaderHasMessageAvailable
---
 .../test/java/org/apache/pulsar/client/impl/ReaderTest.java  | 12 ++++++++++++
 .../java/org/apache/pulsar/client/impl/ConsumerImpl.java     |  9 +++++++--
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index ab00a84..8400102 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -303,6 +303,18 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testReaderHasMessageAvailable() throws Exception {
+        final String topic = "persistent://my-property/my-ns/testReaderHasMessageAvailable" + System.currentTimeMillis();
+        @Cleanup
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .startMessageId(MessageId.latest)
+                .startMessageIdInclusive()
+                .create();
+        assertFalse(reader.hasMessageAvailable());
+    }
+
+    @Test
     public void testKeyHashRangeReader() throws IOException {
          final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
         final String topic = "persistent://my-property/my-ns/testKeyHashRangeReader";
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c87bdc7..1f2613c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2010,11 +2010,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     startMessageId.partitionIndex == -1) {
 
                 getLastMessageIdAsync()
-                        .thenCompose(this::seekAsync)
-                        .whenComplete((ignore, e) -> {
+                        .thenCompose((msgId) -> seekAsync(msgId).thenApply((ignore) -> msgId))
+                        .whenComplete((msgId, e) -> {
                             if (e != null) {
                                 log.error("[{}][{}] Failed getLastMessageId command", topic, subscription);
                                 booleanFuture.completeExceptionally(e.getCause());
+                                return;
+                            }
+                            MessageIdImpl messageId = MessageIdImpl.convertToMessageIdImpl(msgId);
+                            if (messageId == null || messageId.getEntryId() < 0) {
+                                booleanFuture.complete(false);
                             } else {
                                 booleanFuture.complete(resetIncludeHead);
                             }