You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/06/10 07:12:37 UTC

[pulsar] branch master updated: [fix][broker] Avoid storing `MessageMetadata` instances returned by `peekMessageMetadata` (#15983)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 36690f5a7f7 [fix][broker] Avoid storing `MessageMetadata` instances returned by `peekMessageMetadata` (#15983)
36690f5a7f7 is described below

commit 36690f5a7f79fb744c1d61969f2a277cadd0e704
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Jun 10 15:12:31 2022 +0800

    [fix][broker] Avoid storing `MessageMetadata` instances returned by `peekMessageMetadata` (#15983)
    
    ### Motivation
    
    https://github.com/apache/pulsar/pull/15967 removed the `EntryWrapper`,
    which holds an `Entry` instance that is never used. Instead, after the
    refactoring, the `MessageMetadata` array is useless, see
    https://github.com/apache/pulsar/blob/298a573295f845e46f8a55cee366b6db63e997c2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L517-L519
    
    Each `MessageMetadata` instance in the array is returned by
    `peekMessageMetadata`, whose returned value references a thread local
    object `Commands#LOCAL_MESSAGE_METADATA`. It brings a problem that if
    multiple entries were read, all `MessageMetadata` elements in the array
    reference the same object.
    
    However, accidentally, the wrong invocation of `Optional#orElse` saves
    it. See
    https://github.com/apache/pulsar/blob/298a573295f845e46f8a55cee366b6db63e997c2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L133-L134
    
    Each time `peekMessageMetadata` is called, the thread local message
    metadata will be updated. Unlike `orElseGet`, the expression in `orElse`
    is always called no matter if the optional is empty.
    
    This behavior change increases the invocations count of
    `peekMessageMetadata` and the `metadataArray` cache became redundant.
    
    ### Modifications
    
    - Use `orElseGet` instead of `orElse` in `AbstractBaseDispatcher`.
    - Add a new static method `Commands#peekAndCopyMessageMetadata` that
      returns a `MessageMetadata` instance allocated from heap memory.
    - Call `peekAndCopyMessageMetadata` to cache all message metadata
      instances in `PersistentDispatcherMultipleConsumers`.
    
    ### Verifying this change
    
    It's hard to add tests. As I've explained before, #15967 only degrades the
    performance and doesn't affect the correctness.
---
 .../pulsar/broker/service/AbstractBaseDispatcher.java  |  2 +-
 .../PersistentDispatcherMultipleConsumers.java         |  2 +-
 .../org/apache/pulsar/common/protocol/Commands.java    | 18 ++++++++++++++++++
 3 files changed, 20 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index e8537fe606d..dc71bcf3501 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -131,7 +131,7 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
             ByteBuf metadataAndPayload = entry.getDataBuffer();
             final int metadataIndex = i + startOffset;
             final MessageMetadata msgMetadata = optMetadataArray.map(metadataArray -> metadataArray[metadataIndex])
-                    .orElse(Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1));
+                    .orElseGet(() -> Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1));
             if (CollectionUtils.isNotEmpty(entryFilters)) {
                 fillContext(filterContext, msgMetadata, subscription, consumer);
                 EntryFilter.FilterResult filterResult = getFilterResult(filterContext, entry, entryFilters);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 2438f9ab8ac..78c0a996431 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -515,7 +515,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             return;
         }
         final MessageMetadata[] metadataArray = entries.stream()
-                .map(entry -> Commands.peekMessageMetadata(entry.getDataBuffer(), subscription.toString(), -1))
+                .map(entry -> Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), subscription.toString(), -1))
                 .toArray(MessageMetadata[]::new);
         int remainingMessages = Stream.of(metadataArray).filter(Objects::nonNull)
                 .map(MessageMetadata::getNumMessagesInBatch)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 285a12321c1..46c22375b77 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1791,6 +1791,24 @@ public class Commands {
         }
     }
 
+    /**
+     * Peek the message metadata from the buffer and return a deep copy of the metadata.
+     *
+     * If you want to hold multiple {@link MessageMetadata} instances from multiple buffers, you must call this method
+     * rather than {@link Commands#peekMessageMetadata(ByteBuf, String, long)}, which returns a thread local reference,
+     * see {@link Commands#LOCAL_MESSAGE_METADATA}.
+     */
+    public static MessageMetadata peekAndCopyMessageMetadata(
+            ByteBuf metadataAndPayload, String subscription, long consumerId) {
+        final MessageMetadata localMetadata = peekMessageMetadata(metadataAndPayload, subscription, consumerId);
+        if (localMetadata == null) {
+            return null;
+        }
+        final MessageMetadata metadata = new MessageMetadata();
+        metadata.copyFrom(localMetadata);
+        return metadata;
+    }
+
     private static final byte[] NONE_KEY = "NONE_KEY".getBytes(StandardCharsets.UTF_8);
     public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) {
         try {