You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/25 04:55:58 UTC
[pulsar] 01/13: fix parseMessageMetadata error cause by not skip
broker entry metadata (#10968)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9c3e904ae56b660ca7b0f7b201effcc884654fd5
Author: Aloys <lo...@gmail.com>
AuthorDate: Sat Jun 19 02:00:48 2021 +0800
fix parseMessageMetadata error cause by not skip broker entry metadata (#10968)
Fixes #10967
### Motivation
fix parseMessageMetadata error cause by not skip broker entry metadata
### Modifications
skip broker entry metadata if exist before parsing message metadata
(cherry picked from commit 0774b5fddddc0c9fe9b7cc00ae40e43322690ef1)
---
.../broker/admin/impl/PersistentTopicsBase.java | 2 -
.../broker/service/BrokerEntryMetadataE2ETest.java | 20 +++++++++
.../org/apache/pulsar/client/impl/MessageImpl.java | 2 -
.../apache/pulsar/client/impl/MessageImplTest.java | 51 ++++++++++++++++++++++
.../apache/pulsar/common/protocol/Commands.java | 5 ++-
5 files changed, 74 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4a03cd1..76211b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2451,8 +2451,6 @@ public class PersistentTopicsBase extends AdminResource {
PositionImpl pos = (PositionImpl) entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();
- // moves the readerIndex to the payload
- Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
ResponseBuilder responseBuilder = Response.ok();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index 5cbaf3d..e7d98a8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -109,4 +109,24 @@ public class BrokerEntryMetadataE2ETest extends BrokerTestBase {
Assert.assertEquals(messages.size(), 1);
Assert.assertEquals(messages.get(0).getData(), "hello".getBytes());
}
+
+ @Test(timeOut = 20000)
+ public void testGetLastMessageId() throws Exception {
+ final String topic = "persistent://prop/ns-abc/topic-test";
+ final String subscription = "my-sub";
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .create();
+ producer.newMessage().value("hello".getBytes()).send();
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName(subscription)
+ .subscribe();
+ consumer.getLastMessageId();
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index efb66e5..c9370f3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -300,8 +300,6 @@ public class MessageImpl<T> implements Message<T> {
@SuppressWarnings("unchecked")
MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();
- Commands.skipBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata);
-
Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata, msg.msgMetadata);
msg.payload = headersAndPayloadWithBrokerEntryMetadata;
msg.messageId = null;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index 0a57d93..17b77a2 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -483,4 +483,55 @@ public class MessageImplTest {
fail();
}
}
+
+ @Test(timeOut = 30000)
+ public void testParseMessageMetadataWithBrokerEntryMetadata() {
+ int MOCK_BATCH_SIZE = 10;
+ String data = "test-message";
+ ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
+ byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
+
+ // first, build a message with broker entry metadata
+
+ // build message metadata
+ MessageMetadata messageMetadata = new MessageMetadata()
+ .setPublishTime(1)
+ .setProducerName("test")
+ .setSequenceId(1);
+ byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);
+
+ // build broker entry metadata
+ BrokerEntryMetadata brokerMetadata = new BrokerEntryMetadata()
+ .setIndex(MOCK_BATCH_SIZE - 1);
+
+ // build final data which contains broker entry metadata
+ int brokerMetaSize = brokerMetadata.getSerializedSize();
+ ByteBuf brokerMeta = PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6);
+ brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
+ brokerMeta.writeInt(brokerMetaSize);
+ brokerMetadata.writeTo(brokerMeta);
+
+ CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+ compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
+
+ CompositeByteBuf dupCompositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+ dupCompositeByteBuf.addComponents(true, brokerMeta, byteBuf);
+
+ //second, parse message metadata without skip broker entry metadata
+ Commands.skipChecksumIfPresent(compositeByteBuf);
+ int metadataSize = (int) compositeByteBuf.readUnsignedInt();
+ MessageMetadata md = new MessageMetadata();
+ try {
+ md.parseFrom(compositeByteBuf, metadataSize);
+ Assert.fail("Parse operation should be failed.");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ //third, parse message metadata with skip broker entry metadata first
+ MessageMetadata metadata = Commands.parseMessageMetadata(dupCompositeByteBuf);
+ assertEquals(metadata.getPublishTime(), 1);
+ assertEquals(metadata.getProducerName(), "test");
+ assertEquals(metadata.getSequenceId(), 1);
+ }
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 0c80395..7c93ecc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -418,6 +418,9 @@ public class Commands {
}
public static void parseMessageMetadata(ByteBuf buffer, MessageMetadata msgMetadata) {
+ // initially reader-index may point to start of broker entry metadata :
+ // increment reader-index to start_of_headAndPayload to parse metadata
+ skipBrokerEntryMetadataIfExist(buffer);
// initially reader-index may point to start_of_checksum : increment reader-index to start_of_metadata
// to parse metadata
skipChecksumIfPresent(buffer);
@@ -1667,7 +1670,6 @@ public class Commands {
try {
// save the reader index and restore after parsing
int readerIdx = metadataAndPayload.readerIndex();
- skipBrokerEntryMetadataIfExist(metadataAndPayload);
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.readerIndex(readerIdx);
@@ -1682,7 +1684,6 @@ public class Commands {
public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) {
try {
int readerIdx = metadataAndPayload.readerIndex();
- skipBrokerEntryMetadataIfExist(metadataAndPayload);
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.readerIndex(readerIdx);
if (metadata.hasOrderingKey()) {