You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/29 11:43:49 UTC
[pulsar] branch branch-2.8 updated: Fix typo of the returned last
message ID when the last message ID is from compacted ledger (#12237)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new f844c74 Fix typo of the returned last message ID when the last message ID is from compacted ledger (#12237)
f844c74 is described below
commit f844c74b9c1e0e90cb72971078c9f187601385bb
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Sep 29 19:41:56 2021 +0800
Fix typo of the returned last message ID when the last message ID is from compacted ledger (#12237)
(cherry picked from commit 102e3d22dd2af973778319ab2a89f5995ce4d6d4)
---
.../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java | 2 +-
.../test/java/org/apache/pulsar/compaction/CompactedTopicTest.java | 6 +++++-
2 files changed, 6 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a6d7f9c..390d493 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1727,7 +1727,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
int bs = metadata.getNumMessagesInBatch();
int largestBatchIndex = bs > 0 ? bs - 1 : -1;
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
- entry.getLedgerId(), entry.getLedgerId(), partitionIndex, largestBatchIndex,
+ entry.getLedgerId(), entry.getEntryId(), partitionIndex, largestBatchIndex,
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
entry.release();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index b70f495..4dc7bf3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -57,6 +57,7 @@ import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.RawMessageImpl;
+import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -413,7 +414,10 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
.create();
Assert.assertTrue(reader.hasMessageAvailable());
- Assert.assertEquals(msg, reader.readNext().getValue());
+ Message<String> received = reader.readNext();
+ Assert.assertEquals(msg, received.getValue());
+ MessageId messageId = ((ReaderImpl<String>) reader).getConsumer().getLastMessageId();
+ Assert.assertEquals(messageId, received.getMessageId());
Assert.assertFalse(reader.hasMessageAvailable());
}
}