You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/18 21:27:56 UTC

[GitHub] [pulsar] aloyszhang opened a new pull request #10968: fix parseMessageMetadata error cause by not skip broker entry metadata

aloyszhang opened a new pull request #10968:
URL: https://github.com/apache/pulsar/pull/10968


   
   
   Fixes #10967
   
   ### Motivation
   fix parseMessageMetadata error cause by not skip broker entry metadata 
   
   ### Modifications
   
   try to skip broker entry metadata if exist before parsing message metadata
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower merged pull request #10968: fix parseMessageMetadata error cause by not skip broker entry metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower merged pull request #10968:
URL: https://github.com/apache/pulsar/pull/10968


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #10968: fix parseMessageMetadata error cause by not skip broker entry metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #10968:
URL: https://github.com/apache/pulsar/pull/10968#discussion_r654347222



##########
File path: pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
##########
@@ -483,4 +483,55 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() {
             fail();
         }
     }
+
+    @Test(timeOut = 30000)
+    public void testParseMessageMetadataWithBrokerEntryMetadata() {
+        int MOCK_BATCH_SIZE = 10;
+        String data = "test-message";
+        ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
+        byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
+
+        // first, build a message with broker entry metadata
+
+        // build message metadata
+        MessageMetadata messageMetadata = new MessageMetadata()
+                .setPublishTime(1)
+                .setProducerName("test")
+                .setSequenceId(1);
+        byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);
+
+        // build broker entry metadata
+        BrokerEntryMetadata brokerMetadata = new BrokerEntryMetadata()
+                .setIndex(MOCK_BATCH_SIZE - 1);
+
+        // build final data which contains broker entry metadata
+        int brokerMetaSize = brokerMetadata.getSerializedSize();
+        ByteBuf  brokerMeta = PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6);
+        brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
+        brokerMeta.writeInt(brokerMetaSize);
+        brokerMetadata.writeTo(brokerMeta);
+
+        CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+        compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
+
+        CompositeByteBuf dupCompositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+        dupCompositeByteBuf.addComponents(true, brokerMeta, byteBuf);
+
+        //second, parse message metadata without skip broker entry metadata
+        try {
+            Commands.skipChecksumIfPresent(compositeByteBuf);
+            int metadataSize = (int) compositeByteBuf.readUnsignedInt();
+            MessageMetadata md = new MessageMetadata();
+            md.parseFrom(compositeByteBuf, metadataSize);
+            Assert.fail("Parse operation should be failed.");
+        } catch (Exception e) {
+            // expected
+        }

Review comment:
       In addition, I think it's better to make some invocations outside the try block like
   
   ```java
           Commands.skipChecksumIfPresent(compositeByteBuf);
           int metadataSize = (int) compositeByteBuf.readUnsignedInt();
           MessageMetadata md = new MessageMetadata();
           try {
               md.parseFrom(compositeByteBuf, metadataSize);
               Assert.fail("Parse operation should be failed.");
           } catch (IllegalArgumentException e) {
               // expected
           }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #10968: fix parseMessageMetadata error cause by not skip broker entry metadata

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #10968:
URL: https://github.com/apache/pulsar/pull/10968#issuecomment-863808608






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #10968: fix parseMessageMetadata error cause by not skip broker entry metadata

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #10968:
URL: https://github.com/apache/pulsar/pull/10968#issuecomment-863800403






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on a change in pull request #10968: fix parseMessageMetadata error cause by not skip broker entry metadata

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on a change in pull request #10968:
URL: https://github.com/apache/pulsar/pull/10968#discussion_r654361204



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
##########
@@ -109,4 +109,24 @@ public void testPeekMessage() throws Exception {
         Assert.assertEquals(messages.size(), 1);
         Assert.assertEquals(messages.get(0).getData(), "hello".getBytes());
     }
+
+    @Test(timeOut = 20000)
+    public void testGetLastMessageId() throws Exception {
+        final String topic = "persistent://prop/ns-abc/topic-test";
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+        producer.newMessage().value("hello".getBytes()).send();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subscription)
+                .subscribe();
+        System.out.println(consumer.getLastMessageId());

Review comment:
       removed

##########
File path: pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
##########
@@ -483,4 +483,55 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() {
             fail();
         }
     }
+
+    @Test(timeOut = 30000)
+    public void testParseMessageMetadataWithBrokerEntryMetadata() {
+        int MOCK_BATCH_SIZE = 10;
+        String data = "test-message";
+        ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
+        byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
+
+        // first, build a message with broker entry metadata
+
+        // build message metadata
+        MessageMetadata messageMetadata = new MessageMetadata()
+                .setPublishTime(1)
+                .setProducerName("test")
+                .setSequenceId(1);
+        byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);
+
+        // build broker entry metadata
+        BrokerEntryMetadata brokerMetadata = new BrokerEntryMetadata()
+                .setIndex(MOCK_BATCH_SIZE - 1);
+
+        // build final data which contains broker entry metadata
+        int brokerMetaSize = brokerMetadata.getSerializedSize();
+        ByteBuf  brokerMeta = PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6);
+        brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
+        brokerMeta.writeInt(brokerMetaSize);
+        brokerMetadata.writeTo(brokerMeta);
+
+        CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+        compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
+
+        CompositeByteBuf dupCompositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+        dupCompositeByteBuf.addComponents(true, brokerMeta, byteBuf);
+
+        //second, parse message metadata without skip broker entry metadata
+        try {
+            Commands.skipChecksumIfPresent(compositeByteBuf);
+            int metadataSize = (int) compositeByteBuf.readUnsignedInt();
+            MessageMetadata md = new MessageMetadata();
+            md.parseFrom(compositeByteBuf, metadataSize);
+            Assert.fail("Parse operation should be failed.");
+        } catch (Exception e) {
+            // expected
+        }

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #10968: fix parseMessageMetadata error cause by not skip broker entry metadata

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #10968:
URL: https://github.com/apache/pulsar/pull/10968#discussion_r654334240



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
##########
@@ -109,4 +109,24 @@ public void testPeekMessage() throws Exception {
         Assert.assertEquals(messages.size(), 1);
         Assert.assertEquals(messages.get(0).getData(), "hello".getBytes());
     }
+
+    @Test(timeOut = 20000)
+    public void testGetLastMessageId() throws Exception {
+        final String topic = "persistent://prop/ns-abc/topic-test";
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+        producer.newMessage().value("hello".getBytes()).send();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subscription)
+                .subscribe();
+        System.out.println(consumer.getLastMessageId());

Review comment:
       Please remove System.out.println

##########
File path: pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
##########
@@ -483,4 +483,55 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() {
             fail();
         }
     }
+
+    @Test(timeOut = 30000)
+    public void testParseMessageMetadataWithBrokerEntryMetadata() {
+        int MOCK_BATCH_SIZE = 10;
+        String data = "test-message";
+        ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
+        byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
+
+        // first, build a message with broker entry metadata
+
+        // build message metadata
+        MessageMetadata messageMetadata = new MessageMetadata()
+                .setPublishTime(1)
+                .setProducerName("test")
+                .setSequenceId(1);
+        byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);
+
+        // build broker entry metadata
+        BrokerEntryMetadata brokerMetadata = new BrokerEntryMetadata()
+                .setIndex(MOCK_BATCH_SIZE - 1);
+
+        // build final data which contains broker entry metadata
+        int brokerMetaSize = brokerMetadata.getSerializedSize();
+        ByteBuf  brokerMeta = PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6);
+        brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
+        brokerMeta.writeInt(brokerMetaSize);
+        brokerMetadata.writeTo(brokerMeta);
+
+        CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+        compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
+
+        CompositeByteBuf dupCompositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+        dupCompositeByteBuf.addComponents(true, brokerMeta, byteBuf);
+
+        //second, parse message metadata without skip broker entry metadata
+        try {
+            Commands.skipChecksumIfPresent(compositeByteBuf);
+            int metadataSize = (int) compositeByteBuf.readUnsignedInt();
+            MessageMetadata md = new MessageMetadata();
+            md.parseFrom(compositeByteBuf, metadataSize);
+            Assert.fail("Parse operation should be failed.");
+        } catch (Exception e) {
+            // expected
+        }

Review comment:
       Would it make sense to check the type of the exception?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #10968: fix parseMessageMetadata error cause by not skip broker entry metadata

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #10968:
URL: https://github.com/apache/pulsar/pull/10968#issuecomment-863832357


   I agree with @lhotari, the extra call of `skipBrokerEntryMetadataIfExist` should be removed.
   
   Regarding to the unit tests, I think the existed tests already cover the cases when broker entry metadata is not enabled. For the case that broker entry metadata is enabled, some tests have been added to `BrokerEntryMetadataE2ETest` that could also cover the case. IMO, you can only add tests to cover #10950 since this PR could solve the problem completely.
   
   /cc @wuzhanpeng 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari edited a comment on pull request #10968: fix parseMessageMetadata error cause by not skip broker entry metadata

Posted by GitBox <gi...@apache.org>.
lhotari edited a comment on pull request #10968:
URL: https://github.com/apache/pulsar/pull/10968#issuecomment-863811422


   It seems that `skipBrokerEntryMetadataIfExist` is called separately in many locations in Pulsar code base, before calling `parseMessageMetadata` method. For example:
   
   https://github.com/apache/pulsar/blob/18f2f4a9c1dab7eec5c7c9590b76aca17ee1ead8/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L298-L313
   
   https://github.com/apache/pulsar/blob/202da117b529b24bdf9c994750266dac597294a8/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java#L2454-L2455
   
   https://github.com/apache/pulsar/blob/f25b4a520bb82c6a09d373c9d314ca8e55d1f57b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java#L1670-L1671
   
   https://github.com/apache/pulsar/blob/f25b4a520bb82c6a09d373c9d314ca8e55d1f57b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java#L1685-L1686
   
   It seems that this inconsistency would have to be resolved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org