You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/10/24 10:02:59 UTC
[pulsar] branch branch-2.11 updated: [fix][client] Fix RawReader hasMessageAvailable returns true when no messages (#21032)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new c8abba8f1ea [fix][client] Fix RawReader hasMessageAvailable returns true when no messages (#21032)
c8abba8f1ea is described below
commit c8abba8f1ea3fefba8a0338b4eee70a08b1a4f26
Author: Jiwe Guo <te...@apache.org>
AuthorDate: Tue Oct 24 18:02:38 2023 +0800
[fix][client] Fix RawReader hasMessageAvailable returns true when no messages (#21032)
---
.../src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 331a353b9bd..8faf02c81b3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -151,14 +151,13 @@ public class RawReaderImpl implements RawReader {
// TODO message validation
numMsg = 1;
}
+ MessageIdData messageId = messageAndCnx.msg.getMessageIdData();
+ lastDequeuedMessageId = new BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
+ messageId.getPartition(), numMsg - 1);
if (!future.complete(messageAndCnx.msg)) {
messageAndCnx.msg.close();
closeAsync();
}
- MessageIdData messageId = messageAndCnx.msg.getMessageIdData();
- lastDequeuedMessageId = new BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
- messageId.getPartition(), numMsg - 1);
-
ClientCnx currentCnx = cnx();
if (currentCnx == messageAndCnx.cnx) {
increaseAvailablePermits(currentCnx, numMsg);