You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/18 20:59:08 UTC

[GitHub] [pulsar] lhotari commented on a change in pull request #10968: fix parseMessageMetadata error cause by not skip broker entry metadata

lhotari commented on a change in pull request #10968:
URL: https://github.com/apache/pulsar/pull/10968#discussion_r654334240



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
##########
@@ -109,4 +109,24 @@ public void testPeekMessage() throws Exception {
         Assert.assertEquals(messages.size(), 1);
         Assert.assertEquals(messages.get(0).getData(), "hello".getBytes());
     }
+
+    @Test(timeOut = 20000)
+    public void testGetLastMessageId() throws Exception {
+        final String topic = "persistent://prop/ns-abc/topic-test";
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+        producer.newMessage().value("hello".getBytes()).send();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subscription)
+                .subscribe();
+        System.out.println(consumer.getLastMessageId());

Review comment:
       Please remove System.out.println

##########
File path: pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
##########
@@ -483,4 +483,55 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() {
             fail();
         }
     }
+
+    @Test(timeOut = 30000)
+    public void testParseMessageMetadataWithBrokerEntryMetadata() {
+        int MOCK_BATCH_SIZE = 10;
+        String data = "test-message";
+        ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
+        byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
+
+        // first, build a message with broker entry metadata
+
+        // build message metadata
+        MessageMetadata messageMetadata = new MessageMetadata()
+                .setPublishTime(1)
+                .setProducerName("test")
+                .setSequenceId(1);
+        byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);
+
+        // build broker entry metadata
+        BrokerEntryMetadata brokerMetadata = new BrokerEntryMetadata()
+                .setIndex(MOCK_BATCH_SIZE - 1);
+
+        // build final data which contains broker entry metadata
+        int brokerMetaSize = brokerMetadata.getSerializedSize();
+        ByteBuf  brokerMeta = PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6);
+        brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
+        brokerMeta.writeInt(brokerMetaSize);
+        brokerMetadata.writeTo(brokerMeta);
+
+        CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+        compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
+
+        CompositeByteBuf dupCompositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+        dupCompositeByteBuf.addComponents(true, brokerMeta, byteBuf);
+
+        //second, parse message metadata without skip broker entry metadata
+        try {
+            Commands.skipChecksumIfPresent(compositeByteBuf);
+            int metadataSize = (int) compositeByteBuf.readUnsignedInt();
+            MessageMetadata md = new MessageMetadata();
+            md.parseFrom(compositeByteBuf, metadataSize);
+            Assert.fail("Parse operation should be failed.");
+        } catch (Exception e) {
+            // expected
+        }

Review comment:
       Would it make sense to check the type of the exception?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org