You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/28 00:32:37 UTC
[pulsar] branch master updated: support peek broker entry metadata
(#9067)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 50dcce8 support peek broker entry metadata (#9067)
50dcce8 is described below
commit 50dcce8e8bf359ef016a29c69143c29b5768771e
Author: Aloys <lo...@gmail.com>
AuthorDate: Mon Dec 28 08:31:54 2020 +0800
support peek broker entry metadata (#9067)
### Motivation
This pull requeset adds `Commands.peekBrokerEntryMetadataIfExist` method to support peek broker entry metadata.
This will be used in protocol handler KOP.
Currently, when handles `FetchRequest`, KOP will
1. firset read `Entry` out of ledger
2. get offset from `Enty` for caculating next offset to read from and building `MemoryRecordsBuilder`
3. traverse every `Entry` and add each message into `MemoryRecordsBuilder`
we need the `Entry` twice, first for peeking the offset and second for buiding `MemoryRecordsBuilder`.
So, it's better that pulsar `Commands` can support only peek the broker entry metadata without changing the original entry content.
---
.../apache/pulsar/common/protocol/Commands.java | 9 ++++++
.../pulsar/common/protocol/CommandUtilsTests.java | 32 ++++++++++++++++++++++
2 files changed, 41 insertions(+)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index e697045..d9c440a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -2030,6 +2030,15 @@ public class Commands {
}
}
+ public static PulsarApi.BrokerEntryMetadata peekBrokerEntryMetadataIfExist(
+ ByteBuf headerAndPayloadWithBrokerEntryMetadata) {
+ final int readerIndex = headerAndPayloadWithBrokerEntryMetadata.readerIndex();
+ PulsarApi.BrokerEntryMetadata entryMetadata =
+ parseBrokerEntryMetadataIfExist(headerAndPayloadWithBrokerEntryMetadata);
+ headerAndPayloadWithBrokerEntryMetadata.readerIndex(readerIndex);
+ return entryMetadata;
+ }
+
public static ByteBuf serializeMetadataAndPayload(ChecksumType checksumType,
MessageMetadata msgMetadata, ByteBuf payload) {
// / Wire format
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
index 2bd4df0..c7fcc71 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
@@ -202,6 +202,38 @@ public class CommandUtilsTests {
assertEquals(new String(content, StandardCharsets.UTF_8), data);
}
+ @Test
+ public void testPeekBrokerEntryMetadata() throws Exception {
+ int MOCK_BATCH_SIZE = 10;
+ String data = "test-message";
+ ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
+ byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
+ ByteBuf dataWithBrokerEntryMetadata =
+ Commands.addBrokerEntryMetadata(byteBuf, getBrokerEntryMetadataInterceptors(), MOCK_BATCH_SIZE);
+ int bytesBeforePeek = dataWithBrokerEntryMetadata.readableBytes();
+ PulsarApi.BrokerEntryMetadata brokerMetadata =
+ Commands.peekBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata);
+
+ assertTrue(brokerMetadata.getBrokerTimestamp() <= System.currentTimeMillis());
+ assertEquals(brokerMetadata.getIndex(), MOCK_BATCH_SIZE - 1);
+
+ int bytesAfterPeek = dataWithBrokerEntryMetadata.readableBytes();
+ assertEquals(bytesBeforePeek, bytesAfterPeek);
+
+ // test parse logic after peek
+
+ PulsarApi.BrokerEntryMetadata brokerMetadata1 =
+ Commands.parseBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata);
+ assertTrue(brokerMetadata1.getBrokerTimestamp() <= System.currentTimeMillis());
+
+ assertEquals(brokerMetadata1.getIndex(), MOCK_BATCH_SIZE - 1);
+ assertEquals(data.length(), dataWithBrokerEntryMetadata.readableBytes());
+
+ byte [] content = new byte[dataWithBrokerEntryMetadata.readableBytes()];
+ dataWithBrokerEntryMetadata.readBytes(content);
+ assertEquals(new String(content, StandardCharsets.UTF_8), data);
+ }
+
public Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() {
Set<String> interceptorNames = new HashSet<>();
interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");