You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/29 11:43:49 UTC

[pulsar] branch branch-2.8 updated: Fix typo of the returned last message ID when the last message ID is from compacted ledger (#12237)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new f844c74  Fix typo of the returned last message ID when the last message ID is from compacted ledger (#12237)
f844c74 is described below

commit f844c74b9c1e0e90cb72971078c9f187601385bb
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Sep 29 19:41:56 2021 +0800

    Fix typo of the returned last message ID when the last message ID is from compacted ledger (#12237)
    
    
    (cherry picked from commit 102e3d22dd2af973778319ab2a89f5995ce4d6d4)
---
 .../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java   | 2 +-
 .../test/java/org/apache/pulsar/compaction/CompactedTopicTest.java  | 6 +++++-
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a6d7f9c..390d493 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1727,7 +1727,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                             int bs = metadata.getNumMessagesInBatch();
                             int largestBatchIndex = bs > 0 ? bs - 1 : -1;
                             ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
-                                    entry.getLedgerId(), entry.getLedgerId(), partitionIndex, largestBatchIndex,
+                                    entry.getLedgerId(), entry.getEntryId(), partitionIndex, largestBatchIndex,
                                     markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
                                     markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
                             entry.release();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index b70f495..4dc7bf3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -57,6 +57,7 @@ import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.RawMessageImpl;
+import org.apache.pulsar.client.impl.ReaderImpl;
 import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -413,7 +414,10 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
                 .create();
 
         Assert.assertTrue(reader.hasMessageAvailable());
-        Assert.assertEquals(msg, reader.readNext().getValue());
+        Message<String> received = reader.readNext();
+        Assert.assertEquals(msg, received.getValue());
+        MessageId messageId = ((ReaderImpl<String>) reader).getConsumer().getLastMessageId();
+        Assert.assertEquals(messageId, received.getMessageId());
         Assert.assertFalse(reader.hasMessageAvailable());
     }
 }