You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/28 00:32:37 UTC

[pulsar] branch master updated: support peek broker entry metadata (#9067)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 50dcce8  support peek broker entry metadata (#9067)
50dcce8 is described below

commit 50dcce8e8bf359ef016a29c69143c29b5768771e
Author: Aloys <lo...@gmail.com>
AuthorDate: Mon Dec 28 08:31:54 2020 +0800

    support peek broker entry metadata (#9067)
    
    ### Motivation
    This pull requeset adds `Commands.peekBrokerEntryMetadataIfExist` method to support peek broker entry metadata.
    
    This will be used in protocol handler KOP.
    Currently, when handles `FetchRequest`, KOP will
    1. firset read `Entry` out of ledger
    2. get offset from `Enty` for caculating next offset to read from and building `MemoryRecordsBuilder`
    3. traverse every `Entry` and add each message into `MemoryRecordsBuilder`
    we need the `Entry` twice, first for peeking the offset and second for buiding `MemoryRecordsBuilder`.
    
    So, it's better that pulsar `Commands` can support only peek the broker entry metadata without changing the original entry content.
---
 .../apache/pulsar/common/protocol/Commands.java    |  9 ++++++
 .../pulsar/common/protocol/CommandUtilsTests.java  | 32 ++++++++++++++++++++++
 2 files changed, 41 insertions(+)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index e697045..d9c440a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -2030,6 +2030,15 @@ public class Commands {
         }
     }
 
+    public static PulsarApi.BrokerEntryMetadata peekBrokerEntryMetadataIfExist(
+            ByteBuf headerAndPayloadWithBrokerEntryMetadata) {
+        final int readerIndex = headerAndPayloadWithBrokerEntryMetadata.readerIndex();
+        PulsarApi.BrokerEntryMetadata entryMetadata =
+                parseBrokerEntryMetadataIfExist(headerAndPayloadWithBrokerEntryMetadata);
+        headerAndPayloadWithBrokerEntryMetadata.readerIndex(readerIndex);
+        return entryMetadata;
+    }
+
     public static ByteBuf serializeMetadataAndPayload(ChecksumType checksumType,
                                                       MessageMetadata msgMetadata, ByteBuf payload) {
         // / Wire format
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
index 2bd4df0..c7fcc71 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
@@ -202,6 +202,38 @@ public class CommandUtilsTests {
         assertEquals(new String(content, StandardCharsets.UTF_8), data);
     }
 
+    @Test
+    public void testPeekBrokerEntryMetadata() throws Exception {
+        int MOCK_BATCH_SIZE = 10;
+        String data = "test-message";
+        ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
+        byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
+        ByteBuf dataWithBrokerEntryMetadata =
+                Commands.addBrokerEntryMetadata(byteBuf, getBrokerEntryMetadataInterceptors(), MOCK_BATCH_SIZE);
+        int bytesBeforePeek = dataWithBrokerEntryMetadata.readableBytes();
+        PulsarApi.BrokerEntryMetadata brokerMetadata =
+                Commands.peekBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata);
+
+        assertTrue(brokerMetadata.getBrokerTimestamp() <= System.currentTimeMillis());
+        assertEquals(brokerMetadata.getIndex(), MOCK_BATCH_SIZE - 1);
+
+        int bytesAfterPeek = dataWithBrokerEntryMetadata.readableBytes();
+        assertEquals(bytesBeforePeek, bytesAfterPeek);
+
+        // test parse logic after peek
+
+        PulsarApi.BrokerEntryMetadata brokerMetadata1 =
+                Commands.parseBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata);
+        assertTrue(brokerMetadata1.getBrokerTimestamp() <= System.currentTimeMillis());
+
+        assertEquals(brokerMetadata1.getIndex(), MOCK_BATCH_SIZE - 1);
+        assertEquals(data.length(), dataWithBrokerEntryMetadata.readableBytes());
+
+        byte [] content = new byte[dataWithBrokerEntryMetadata.readableBytes()];
+        dataWithBrokerEntryMetadata.readBytes(content);
+        assertEquals(new String(content, StandardCharsets.UTF_8), data);
+    }
+
     public Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() {
         Set<String> interceptorNames = new HashSet<>();
         interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");