You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/02 09:29:35 UTC

[pulsar] branch branch-2.7 updated: Fix reader always has message available (#8735) (#8755)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 253ed52  Fix reader always has message available (#8735) (#8755)
253ed52 is described below

commit 253ed5271cb24e4f2e87c8be6c3554948fa3a155
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Dec 2 17:24:42 2020 +0800

    Fix reader always has message available (#8735) (#8755)
    
    Fixes #8721
    
    ### Motivation
    Whenever you create a new Reader with startMessageId set to latest and startMessageIdInclusive, hasMessageAvailable will be always true even if the topic is freshly created without any messages inside. If you remove message inclusiveness, then the issue is not appearing.
    
    ### Modifications
    If the returned entryId is illegal, it means there is no message
    
    ### Verifying this change
    ReaderTest#testReaderHasMessageAvailable
    
    (cherry picked from commit 4d8974d8581af53b386dd1d58edca42250e377fe)
    
    Co-authored-by: feynmanlin <fe...@tencent.com>
---
 .../test/java/org/apache/pulsar/client/impl/ReaderTest.java  | 12 ++++++++++++
 .../java/org/apache/pulsar/client/impl/ConsumerImpl.java     |  9 +++++++--
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index ab00a84..8400102 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -303,6 +303,18 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testReaderHasMessageAvailable() throws Exception {
+        final String topic = "persistent://my-property/my-ns/testReaderHasMessageAvailable" + System.currentTimeMillis();
+        @Cleanup
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .startMessageId(MessageId.latest)
+                .startMessageIdInclusive()
+                .create();
+        assertFalse(reader.hasMessageAvailable());
+    }
+
+    @Test
     public void testKeyHashRangeReader() throws IOException {
          final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
         final String topic = "persistent://my-property/my-ns/testKeyHashRangeReader";
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c87bdc7..1f2613c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2010,11 +2010,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     startMessageId.partitionIndex == -1) {
 
                 getLastMessageIdAsync()
-                        .thenCompose(this::seekAsync)
-                        .whenComplete((ignore, e) -> {
+                        .thenCompose((msgId) -> seekAsync(msgId).thenApply((ignore) -> msgId))
+                        .whenComplete((msgId, e) -> {
                             if (e != null) {
                                 log.error("[{}][{}] Failed getLastMessageId command", topic, subscription);
                                 booleanFuture.completeExceptionally(e.getCause());
+                                return;
+                            }
+                            MessageIdImpl messageId = MessageIdImpl.convertToMessageIdImpl(msgId);
+                            if (messageId == null || messageId.getEntryId() < 0) {
+                                booleanFuture.complete(false);
                             } else {
                                 booleanFuture.complete(resetIncludeHead);
                             }