You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/12/13 17:35:29 UTC

[pulsar] branch master updated: Share the batch metadata object across all messages in batch (#3156)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e41f70  Share the batch metadata object across all messages in batch (#3156)
5e41f70 is described below

commit 5e41f70d45b309ad55b238a320689faf48d99b14
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Dec 13 09:35:24 2018 -0800

    Share the batch metadata object across all messages in batch (#3156)
---
 .../pulsar/common/api/raw/MessageParser.java       | 22 +++++-----
 .../pulsar/common/api/raw/RawMessageImpl.java      | 26 +++++++-----
 .../common/api/raw/ReferenceCountedObject.java     | 49 ++++++++++++++++++++++
 3 files changed, 76 insertions(+), 21 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
index 1280837..d894948 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
@@ -23,6 +23,7 @@ import static org.apache.pulsar.common.api.Commands.hasChecksum;
 import static org.apache.pulsar.common.api.Commands.readChecksum;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
 
 import java.io.IOException;
 
@@ -53,6 +54,7 @@ public class MessageParser {
         MessageMetadata msgMetadata = null;
         ByteBuf payload = headersAndPayload;
         ByteBuf uncompressedPayload = null;
+        ReferenceCountedObject<MessageMetadata> refCntMsgMetadata = null;
 
         try {
             if (!verifyChecksum(topicName, headersAndPayload, ledgerId, entryId)) {
@@ -80,20 +82,20 @@ public class MessageParser {
             }
 
             final int numMessages = msgMetadata.getNumMessagesInBatch();
+            refCntMsgMetadata = new ReferenceCountedObject<>(msgMetadata, (x) -> x.recycle());
 
             if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
-
-                processor.process(RawMessageImpl.get(msgMetadata, null, uncompressedPayload, ledgerId, entryId, 0));
+                processor.process(
+                        RawMessageImpl.get(refCntMsgMetadata, null, uncompressedPayload, ledgerId, entryId, 0));
             } else {
                 // handle batch message enqueuing; uncompressed payload has all messages in batch
-                receiveIndividualMessagesFromBatch(msgMetadata, uncompressedPayload, ledgerId, entryId, processor);
-            }
-        } finally {
-            if (uncompressedPayload != null) {
-                uncompressedPayload.release();
+                receiveIndividualMessagesFromBatch(refCntMsgMetadata, uncompressedPayload, ledgerId, entryId, processor);
             }
 
-            msgMetadata.recycle();
+
+        } finally {
+            ReferenceCountUtil.safeRelease(uncompressedPayload);
+            ReferenceCountUtil.safeRelease(refCntMsgMetadata);
         }
     }
 
@@ -134,9 +136,9 @@ public class MessageParser {
         }
     }
 
-    private static void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata,
+    private static void receiveIndividualMessagesFromBatch(ReferenceCountedObject<MessageMetadata> msgMetadata,
             ByteBuf uncompressedPayload, long ledgerId, long entryId, MessageProcessor processor) {
-        int batchSize = msgMetadata.getNumMessagesInBatch();
+        int batchSize = msgMetadata.get().getNumMessagesInBatch();
 
         try {
             for (int i = 0; i < batchSize; ++i) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
index 6915124..282945f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
@@ -35,7 +35,7 @@ public class RawMessageImpl implements RawMessage {
 
     private final RawMessageIdImpl messageId = new RawMessageIdImpl();
 
-    private MessageMetadata msgMetadata;
+    private ReferenceCountedObject<MessageMetadata> msgMetadata;
     private PulsarApi.SingleMessageMetadata.Builder singleMessageMetadata;
     private ByteBuf payload;
 
@@ -54,6 +54,9 @@ public class RawMessageImpl implements RawMessage {
 
     @Override
     public void release() {
+        msgMetadata.release();
+        msgMetadata = null;
+
         if (singleMessageMetadata != null) {
             singleMessageMetadata.recycle();
             singleMessageMetadata = null;
@@ -63,12 +66,13 @@ public class RawMessageImpl implements RawMessage {
         handle.recycle(this);
     }
 
-    public static RawMessage get(MessageMetadata msgMetadata,
+    public static RawMessage get(ReferenceCountedObject<MessageMetadata> msgMetadata,
             PulsarApi.SingleMessageMetadata.Builder singleMessageMetadata,
             ByteBuf payload,
             long ledgerId, long entryId, long batchIndex) {
         RawMessageImpl msg = RECYCLER.get();
         msg.msgMetadata = msgMetadata;
+        msg.msgMetadata.retain();
         msg.singleMessageMetadata = singleMessageMetadata;
         msg.messageId.ledgerId = ledgerId;
         msg.messageId.entryId = entryId;
@@ -82,8 +86,8 @@ public class RawMessageImpl implements RawMessage {
         if (singleMessageMetadata != null && singleMessageMetadata.getPropertiesCount() > 0) {
             return singleMessageMetadata.getPropertiesList().stream()
                     .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue));
-        } else if (msgMetadata.getPropertiesCount() > 0) {
-            return msgMetadata.getPropertiesList().stream()
+        } else if (msgMetadata.get().getPropertiesCount() > 0) {
+            return msgMetadata.get().getPropertiesList().stream()
                     .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue));
         } else {
             return Collections.emptyMap();
@@ -102,15 +106,15 @@ public class RawMessageImpl implements RawMessage {
 
     @Override
     public long getPublishTime() {
-        return msgMetadata.getPublishTime();
+        return msgMetadata.get().getPublishTime();
     }
 
     @Override
     public long getEventTime() {
         if (singleMessageMetadata != null && singleMessageMetadata.hasEventTime()) {
             return singleMessageMetadata.getEventTime();
-        } else if (msgMetadata.hasEventTime()) {
-            return msgMetadata.getEventTime();
+        } else if (msgMetadata.get().hasEventTime()) {
+            return msgMetadata.get().getEventTime();
         } else {
             return 0;
         }
@@ -118,20 +122,20 @@ public class RawMessageImpl implements RawMessage {
 
     @Override
     public long getSequenceId() {
-        return msgMetadata.getSequenceId() + messageId.batchIndex;
+        return msgMetadata.get().getSequenceId() + messageId.batchIndex;
     }
 
     @Override
     public String getProducerName() {
-        return msgMetadata.getProducerName();
+        return msgMetadata.get().getProducerName();
     }
 
     @Override
     public Optional<String> getKey() {
         if (singleMessageMetadata != null && singleMessageMetadata.hasPartitionKey()) {
             return Optional.of(singleMessageMetadata.getPartitionKey());
-        } else if (msgMetadata.hasPartitionKey()){
-            return Optional.of(msgMetadata.getPartitionKey());
+        } else if (msgMetadata.get().hasPartitionKey()){
+            return Optional.of(msgMetadata.get().getPartitionKey());
         } else {
             return Optional.empty();
         }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedObject.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedObject.java
new file mode 100644
index 0000000..7c58013
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedObject.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.api.raw;
+
+import io.netty.util.AbstractReferenceCounted;
+import io.netty.util.ReferenceCounted;
+
+import java.util.function.Consumer;
+
+public class ReferenceCountedObject<T> extends AbstractReferenceCounted {
+
+    private final T object;
+    private final Consumer<T> destructor;
+
+    public ReferenceCountedObject(T object, Consumer<T> destructor) {
+        this.object = object;
+        this.destructor = destructor;
+    }
+
+    public T get() {
+        return object;
+    }
+
+    @Override
+    public ReferenceCounted touch(Object hint) {
+        return this;
+    }
+
+    @Override
+    protected void deallocate() {
+        destructor.accept(object);
+    }
+}