You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/12/13 17:35:29 UTC
[pulsar] branch master updated: Share the batch metadata object
across all messages in batch (#3156)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5e41f70 Share the batch metadata object across all messages in batch (#3156)
5e41f70 is described below
commit 5e41f70d45b309ad55b238a320689faf48d99b14
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Dec 13 09:35:24 2018 -0800
Share the batch metadata object across all messages in batch (#3156)
---
.../pulsar/common/api/raw/MessageParser.java | 22 +++++-----
.../pulsar/common/api/raw/RawMessageImpl.java | 26 +++++++-----
.../common/api/raw/ReferenceCountedObject.java | 49 ++++++++++++++++++++++
3 files changed, 76 insertions(+), 21 deletions(-)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
index 1280837..d894948 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
@@ -23,6 +23,7 @@ import static org.apache.pulsar.common.api.Commands.hasChecksum;
import static org.apache.pulsar.common.api.Commands.readChecksum;
import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
@@ -53,6 +54,7 @@ public class MessageParser {
MessageMetadata msgMetadata = null;
ByteBuf payload = headersAndPayload;
ByteBuf uncompressedPayload = null;
+ ReferenceCountedObject<MessageMetadata> refCntMsgMetadata = null;
try {
if (!verifyChecksum(topicName, headersAndPayload, ledgerId, entryId)) {
@@ -80,20 +82,20 @@ public class MessageParser {
}
final int numMessages = msgMetadata.getNumMessagesInBatch();
+ refCntMsgMetadata = new ReferenceCountedObject<>(msgMetadata, (x) -> x.recycle());
if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
-
- processor.process(RawMessageImpl.get(msgMetadata, null, uncompressedPayload, ledgerId, entryId, 0));
+ processor.process(
+ RawMessageImpl.get(refCntMsgMetadata, null, uncompressedPayload, ledgerId, entryId, 0));
} else {
// handle batch message enqueuing; uncompressed payload has all messages in batch
- receiveIndividualMessagesFromBatch(msgMetadata, uncompressedPayload, ledgerId, entryId, processor);
- }
- } finally {
- if (uncompressedPayload != null) {
- uncompressedPayload.release();
+ receiveIndividualMessagesFromBatch(refCntMsgMetadata, uncompressedPayload, ledgerId, entryId, processor);
}
- msgMetadata.recycle();
+
+ } finally {
+ ReferenceCountUtil.safeRelease(uncompressedPayload);
+ ReferenceCountUtil.safeRelease(refCntMsgMetadata);
}
}
@@ -134,9 +136,9 @@ public class MessageParser {
}
}
- private static void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata,
+ private static void receiveIndividualMessagesFromBatch(ReferenceCountedObject<MessageMetadata> msgMetadata,
ByteBuf uncompressedPayload, long ledgerId, long entryId, MessageProcessor processor) {
- int batchSize = msgMetadata.getNumMessagesInBatch();
+ int batchSize = msgMetadata.get().getNumMessagesInBatch();
try {
for (int i = 0; i < batchSize; ++i) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
index 6915124..282945f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
@@ -35,7 +35,7 @@ public class RawMessageImpl implements RawMessage {
private final RawMessageIdImpl messageId = new RawMessageIdImpl();
- private MessageMetadata msgMetadata;
+ private ReferenceCountedObject<MessageMetadata> msgMetadata;
private PulsarApi.SingleMessageMetadata.Builder singleMessageMetadata;
private ByteBuf payload;
@@ -54,6 +54,9 @@ public class RawMessageImpl implements RawMessage {
@Override
public void release() {
+ msgMetadata.release();
+ msgMetadata = null;
+
if (singleMessageMetadata != null) {
singleMessageMetadata.recycle();
singleMessageMetadata = null;
@@ -63,12 +66,13 @@ public class RawMessageImpl implements RawMessage {
handle.recycle(this);
}
- public static RawMessage get(MessageMetadata msgMetadata,
+ public static RawMessage get(ReferenceCountedObject<MessageMetadata> msgMetadata,
PulsarApi.SingleMessageMetadata.Builder singleMessageMetadata,
ByteBuf payload,
long ledgerId, long entryId, long batchIndex) {
RawMessageImpl msg = RECYCLER.get();
msg.msgMetadata = msgMetadata;
+ msg.msgMetadata.retain();
msg.singleMessageMetadata = singleMessageMetadata;
msg.messageId.ledgerId = ledgerId;
msg.messageId.entryId = entryId;
@@ -82,8 +86,8 @@ public class RawMessageImpl implements RawMessage {
if (singleMessageMetadata != null && singleMessageMetadata.getPropertiesCount() > 0) {
return singleMessageMetadata.getPropertiesList().stream()
.collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue));
- } else if (msgMetadata.getPropertiesCount() > 0) {
- return msgMetadata.getPropertiesList().stream()
+ } else if (msgMetadata.get().getPropertiesCount() > 0) {
+ return msgMetadata.get().getPropertiesList().stream()
.collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue));
} else {
return Collections.emptyMap();
@@ -102,15 +106,15 @@ public class RawMessageImpl implements RawMessage {
@Override
public long getPublishTime() {
- return msgMetadata.getPublishTime();
+ return msgMetadata.get().getPublishTime();
}
@Override
public long getEventTime() {
if (singleMessageMetadata != null && singleMessageMetadata.hasEventTime()) {
return singleMessageMetadata.getEventTime();
- } else if (msgMetadata.hasEventTime()) {
- return msgMetadata.getEventTime();
+ } else if (msgMetadata.get().hasEventTime()) {
+ return msgMetadata.get().getEventTime();
} else {
return 0;
}
@@ -118,20 +122,20 @@ public class RawMessageImpl implements RawMessage {
@Override
public long getSequenceId() {
- return msgMetadata.getSequenceId() + messageId.batchIndex;
+ return msgMetadata.get().getSequenceId() + messageId.batchIndex;
}
@Override
public String getProducerName() {
- return msgMetadata.getProducerName();
+ return msgMetadata.get().getProducerName();
}
@Override
public Optional<String> getKey() {
if (singleMessageMetadata != null && singleMessageMetadata.hasPartitionKey()) {
return Optional.of(singleMessageMetadata.getPartitionKey());
- } else if (msgMetadata.hasPartitionKey()){
- return Optional.of(msgMetadata.getPartitionKey());
+ } else if (msgMetadata.get().hasPartitionKey()){
+ return Optional.of(msgMetadata.get().getPartitionKey());
} else {
return Optional.empty();
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedObject.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedObject.java
new file mode 100644
index 0000000..7c58013
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedObject.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.api.raw;
+
+import io.netty.util.AbstractReferenceCounted;
+import io.netty.util.ReferenceCounted;
+
+import java.util.function.Consumer;
+
+public class ReferenceCountedObject<T> extends AbstractReferenceCounted {
+
+ private final T object;
+ private final Consumer<T> destructor;
+
+ public ReferenceCountedObject(T object, Consumer<T> destructor) {
+ this.object = object;
+ this.destructor = destructor;
+ }
+
+ public T get() {
+ return object;
+ }
+
+ @Override
+ public ReferenceCounted touch(Object hint) {
+ return this;
+ }
+
+ @Override
+ protected void deallocate() {
+ destructor.accept(object);
+ }
+}