You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/25 04:55:58 UTC

[pulsar] 01/13: fix parseMessageMetadata error cause by not skip broker entry metadata (#10968)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9c3e904ae56b660ca7b0f7b201effcc884654fd5
Author: Aloys <lo...@gmail.com>
AuthorDate: Sat Jun 19 02:00:48 2021 +0800

    fix parseMessageMetadata error cause by not skip broker entry metadata (#10968)
    
    Fixes #10967
    
    ### Motivation
    fix parseMessageMetadata error cause by not skip broker entry metadata
    
    ### Modifications
    
    skip broker entry metadata if exist before parsing message metadata
    
    (cherry picked from commit 0774b5fddddc0c9fe9b7cc00ae40e43322690ef1)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  2 -
 .../broker/service/BrokerEntryMetadataE2ETest.java | 20 +++++++++
 .../org/apache/pulsar/client/impl/MessageImpl.java |  2 -
 .../apache/pulsar/client/impl/MessageImplTest.java | 51 ++++++++++++++++++++++
 .../apache/pulsar/common/protocol/Commands.java    |  5 ++-
 5 files changed, 74 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4a03cd1..76211b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2451,8 +2451,6 @@ public class PersistentTopicsBase extends AdminResource {
         PositionImpl pos = (PositionImpl) entry.getPosition();
         ByteBuf metadataAndPayload = entry.getDataBuffer();
 
-        // moves the readerIndex to the payload
-        Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
         MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
 
         ResponseBuilder responseBuilder = Response.ok();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index 5cbaf3d..e7d98a8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -109,4 +109,24 @@ public class BrokerEntryMetadataE2ETest extends BrokerTestBase {
         Assert.assertEquals(messages.size(), 1);
         Assert.assertEquals(messages.get(0).getData(), "hello".getBytes());
     }
+
+    @Test(timeOut = 20000)
+    public void testGetLastMessageId() throws Exception {
+        final String topic = "persistent://prop/ns-abc/topic-test";
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+        producer.newMessage().value("hello".getBytes()).send();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subscription)
+                .subscribe();
+        consumer.getLastMessageId();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index efb66e5..c9370f3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -300,8 +300,6 @@ public class MessageImpl<T> implements Message<T> {
         @SuppressWarnings("unchecked")
         MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();
 
-        Commands.skipBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata);
-
         Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata, msg.msgMetadata);
         msg.payload = headersAndPayloadWithBrokerEntryMetadata;
         msg.messageId = null;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index 0a57d93..17b77a2 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -483,4 +483,55 @@ public class MessageImplTest {
             fail();
         }
     }
+
+    @Test(timeOut = 30000)
+    public void testParseMessageMetadataWithBrokerEntryMetadata() {
+        int MOCK_BATCH_SIZE = 10;
+        String data = "test-message";
+        ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
+        byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
+
+        // first, build a message with broker entry metadata
+
+        // build message metadata
+        MessageMetadata messageMetadata = new MessageMetadata()
+                .setPublishTime(1)
+                .setProducerName("test")
+                .setSequenceId(1);
+        byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);
+
+        // build broker entry metadata
+        BrokerEntryMetadata brokerMetadata = new BrokerEntryMetadata()
+                .setIndex(MOCK_BATCH_SIZE - 1);
+
+        // build final data which contains broker entry metadata
+        int brokerMetaSize = brokerMetadata.getSerializedSize();
+        ByteBuf  brokerMeta = PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6);
+        brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
+        brokerMeta.writeInt(brokerMetaSize);
+        brokerMetadata.writeTo(brokerMeta);
+
+        CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+        compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
+
+        CompositeByteBuf dupCompositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+        dupCompositeByteBuf.addComponents(true, brokerMeta, byteBuf);
+
+        //second, parse message metadata without skip broker entry metadata
+        Commands.skipChecksumIfPresent(compositeByteBuf);
+        int metadataSize = (int) compositeByteBuf.readUnsignedInt();
+        MessageMetadata md = new MessageMetadata();
+        try {
+            md.parseFrom(compositeByteBuf, metadataSize);
+            Assert.fail("Parse operation should be failed.");
+        } catch (IllegalArgumentException e) {
+            // expected
+        }
+
+        //third, parse message metadata with skip broker entry metadata first
+        MessageMetadata metadata = Commands.parseMessageMetadata(dupCompositeByteBuf);
+        assertEquals(metadata.getPublishTime(), 1);
+        assertEquals(metadata.getProducerName(), "test");
+        assertEquals(metadata.getSequenceId(), 1);
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 0c80395..7c93ecc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -418,6 +418,9 @@ public class Commands {
     }
 
     public static void parseMessageMetadata(ByteBuf buffer, MessageMetadata msgMetadata) {
+        // initially reader-index may point to start of broker entry metadata :
+        // increment reader-index to start_of_headAndPayload to parse metadata
+        skipBrokerEntryMetadataIfExist(buffer);
         // initially reader-index may point to start_of_checksum : increment reader-index to start_of_metadata
         // to parse metadata
         skipChecksumIfPresent(buffer);
@@ -1667,7 +1670,6 @@ public class Commands {
         try {
             // save the reader index and restore after parsing
             int readerIdx = metadataAndPayload.readerIndex();
-            skipBrokerEntryMetadataIfExist(metadataAndPayload);
             MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
             metadataAndPayload.readerIndex(readerIdx);
 
@@ -1682,7 +1684,6 @@ public class Commands {
     public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) {
         try {
             int readerIdx = metadataAndPayload.readerIndex();
-            skipBrokerEntryMetadataIfExist(metadataAndPayload);
             MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
             metadataAndPayload.readerIndex(readerIdx);
             if (metadata.hasOrderingKey()) {