You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/07/15 10:00:04 UTC

[pulsar] branch branch-2.9 updated: [fix][pulsar-broker] Fix RawReader hasMessageAvailable returns true when no messages (#16443)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new d5139b90627 [fix][pulsar-broker] Fix RawReader hasMessageAvailable returns true when no messages (#16443)
d5139b90627 is described below

commit d5139b906271a7bf9980f42b8e11ae4f5aaec793
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Thu Jul 14 16:21:28 2022 +0800

    [fix][pulsar-broker] Fix RawReader hasMessageAvailable returns true when no messages (#16443)
    
    * [fix][pulsar-broker] Fix RawReader hasMessageAvailable returns true when no messages
    ---
    
    *Motivation*
    
    The RawReader hasMessageAvailable will return true when all the
    messages have been consumed. And that will cause the readNextAsync
    blocked and the process never recovered.
    In the ConsumerImpl, we update the lastDequeuedMessageId in the
    messageProcess. The messageReceived method rewrites by the RawReader,
    we should update the lastDequeuedMessageId in the RawReader as well.
    
    * Fix the hasMessageAvaiable in batch messages
    
    (cherry picked from commit 1d174db872914afbafb03607a29d7a2396d87a33)
---
 .../apache/pulsar/client/impl/RawReaderImpl.java   |  3 ++
 .../apache/pulsar/client/impl/RawReaderTest.java   | 52 ++++++++++++++++++++++
 2 files changed, 55 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index bf9fbec59f6..30944289dc3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -154,6 +154,9 @@ public class RawReaderImpl implements RawReader {
                     messageAndCnx.msg.close();
                     closeAsync();
                 }
+                MessageIdData messageId = messageAndCnx.msg.getMessageIdData();
+                lastDequeuedMessageId = new BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
+                    messageId.getPartition(), numMsg - 1);
 
                 ClientCnx currentCnx = cnx();
                 if (currentCnx == messageAndCnx.cnx) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index de9eb80a26c..75aa3ee5941 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -107,6 +107,58 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
         return msgMetadata.getPartitionKey();
     }
 
+    @Test
+    public void testHasMessageAvailableWithoutBatch() throws Exception {
+        int numKeys = 10;
+        String topic = "persistent://my-property/my-ns/my-raw-topic";
+        Set<String> keys = publishMessages(topic, numKeys);
+        RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
+        while (true) {
+            boolean hasMsg = reader.hasMessageAvailableAsync().get();
+            if (hasMsg && keys.isEmpty()) {
+                Assert.fail("HasMessageAvailable shows still has message when there is no message");
+            }
+            if (hasMsg) {
+                try (RawMessage m = reader.readNextAsync().get()) {
+                    Assert.assertTrue(keys.remove(extractKey(m)));
+                }
+            } else {
+                break;
+            }
+        }
+        Assert.assertTrue(keys.isEmpty());
+    }
+
+    @Test
+    public void testHasMessageAvailableWithBatch() throws Exception {
+        int numKeys = 20;
+        String topic = "persistent://my-property/my-ns/my-raw-topic";
+        Set<String> keys = publishMessages(topic, numKeys, true);
+        RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
+        int messageCount = 0;
+        while (true) {
+            boolean hasMsg = reader.hasMessageAvailableAsync().get();
+            if (hasMsg && (messageCount == numKeys)) {
+                Assert.fail("HasMessageAvailable shows still has message when there is no message");
+            }
+            if (hasMsg) {
+                try (RawMessage m = reader.readNextAsync().get()) {
+                    MessageMetadata meta = Commands.parseMessageMetadata(m.getHeadersAndPayload());
+                    messageCount += meta.getNumMessagesInBatch();
+                    RawBatchConverter.extractIdsAndKeysAndSize(m).forEach(batchInfo -> {
+                        String key = batchInfo.getMiddle();
+                        Assert.assertTrue(keys.remove(key));
+                    });
+
+                }
+            } else {
+                break;
+            }
+        }
+        Assert.assertEquals(messageCount, numKeys);
+        Assert.assertTrue(keys.isEmpty());
+    }
+
     @Test
     public void testRawReader() throws Exception {
         int numKeys = 10;