You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/10/24 10:02:59 UTC

[pulsar] branch branch-2.11 updated: [fix][client] Fix RawReader hasMessageAvailable returns true when no messages (#21032)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new c8abba8f1ea [fix][client] Fix RawReader hasMessageAvailable returns true when no messages (#21032)
c8abba8f1ea is described below

commit c8abba8f1ea3fefba8a0338b4eee70a08b1a4f26
Author: Jiwe Guo <te...@apache.org>
AuthorDate: Tue Oct 24 18:02:38 2023 +0800

    [fix][client] Fix RawReader hasMessageAvailable returns true when no messages (#21032)
---
 .../src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 331a353b9bd..8faf02c81b3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -151,14 +151,13 @@ public class RawReaderImpl implements RawReader {
                     // TODO message validation
                     numMsg = 1;
                 }
+                MessageIdData messageId = messageAndCnx.msg.getMessageIdData();
+                lastDequeuedMessageId = new BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
+                        messageId.getPartition(), numMsg - 1);
                 if (!future.complete(messageAndCnx.msg)) {
                     messageAndCnx.msg.close();
                     closeAsync();
                 }
-                MessageIdData messageId = messageAndCnx.msg.getMessageIdData();
-                lastDequeuedMessageId = new BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
-                    messageId.getPartition(), numMsg - 1);
-
                 ClientCnx currentCnx = cnx();
                 if (currentCnx == messageAndCnx.cnx) {
                     increaseAvailablePermits(currentCnx, numMsg);