You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/02/08 07:57:45 UTC

[pulsar] branch branch-2.11 updated: [cleanup][broker] Simplify extract entryMetadata code in filterEntriesForConsumer (#18729)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 644a2c5f8d2 [cleanup][broker] Simplify extract entryMetadata code in filterEntriesForConsumer (#18729)
644a2c5f8d2 is described below

commit 644a2c5f8d20d6889d7f27c8fbe07852ea00f5bd
Author: lifepuzzlefun <wj...@163.com>
AuthorDate: Mon Dec 12 14:18:15 2022 +0800

    [cleanup][broker] Simplify extract entryMetadata code in filterEntriesForConsumer (#18729)
    
    ### Motivation
    
    origin extract entry metadata logic is based on `Optional.map.orElseGet` which can be simplified by if condition and also has better performance on hot code path.
    
    ### Modifications
    
    1. use if null check replace Optional code.
    2. remove duplicate hasChunk check logic in `PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers`
---
 .../broker/service/AbstractBaseDispatcher.java     | 29 ++++++++++++++--------
 .../PersistentDispatcherMultipleConsumers.java     |  5 +---
 2 files changed, 19 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 677b3a84a4c..1157ae65558 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 import lombok.extern.slf4j.Slf4j;
@@ -47,6 +46,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 @Slf4j
 public abstract class AbstractBaseDispatcher extends EntryFilterSupport implements Dispatcher {
@@ -94,22 +94,25 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
     public int filterEntriesForConsumer(List<? extends Entry> entries, EntryBatchSizes batchSizes,
             SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks,
             ManagedCursor cursor, boolean isReplayRead, Consumer consumer) {
-        return filterEntriesForConsumer(Optional.empty(), 0, entries, batchSizes, sendMessageInfo, indexesAcks, cursor,
+        return filterEntriesForConsumer(null, 0, entries, batchSizes,
+                sendMessageInfo, indexesAcks, cursor,
                 isReplayRead, consumer);
     }
 
     /**
      * Filter entries with prefetched message metadata range so that there is no need to peek metadata from Entry.
      *
-     * @param optMetadataArray the optional message metadata array
+     * @param metadataArray the optional message metadata array. need check if null pass.
      * @param startOffset the index in `optMetadataArray` of the first Entry's message metadata
      *
      * @see AbstractBaseDispatcher#filterEntriesForConsumer(List, EntryBatchSizes, SendMessageInfo,
      *   EntryBatchIndexesAcks, ManagedCursor, boolean, Consumer)
      */
-    public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray, int startOffset,
-             List<? extends Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo,
-             EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead, Consumer consumer) {
+    public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, int startOffset,
+                                        List<? extends Entry> entries, EntryBatchSizes batchSizes,
+                                        SendMessageInfo sendMessageInfo,
+                                        EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor,
+                                        boolean isReplayRead, Consumer consumer) {
         int totalMessages = 0;
         long totalBytes = 0;
         int totalChunkedMessages = 0;
@@ -124,11 +127,15 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
             }
             ByteBuf metadataAndPayload = entry.getDataBuffer();
             final int metadataIndex = i + startOffset;
-            final MessageMetadata msgMetadata = optMetadataArray.map(metadataArray -> metadataArray[metadataIndex])
-                    .orElseGet(() -> (entry instanceof EntryAndMetadata)
-                            ? ((EntryAndMetadata) entry).getMetadata()
-                            : Commands.peekAndCopyMessageMetadata(metadataAndPayload, subscription.toString(), -1)
-                    );
+
+            MessageMetadata msgMetadata;
+            if (metadataArray != null) {
+                msgMetadata = metadataArray[metadataIndex];
+            } else if (entry instanceof EntryAndMetadata) {
+                msgMetadata = ((EntryAndMetadata) entry).getMetadata();
+            } else {
+                msgMetadata = Commands.peekAndCopyMessageMetadata(metadataAndPayload, subscription.toString(), -1);
+            }
 
             int entryMsgCnt = msgMetadata == null ? 1 : msgMetadata.getNumMessagesInBatch();
             if (hasFilter) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 6a964f20c37..20c1173286e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -613,9 +613,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 }
             }
             metadataArray[i] = metadata;
-            if (!hasChunk && metadataArray[i] != null && metadataArray[i].hasUuid()) {
-                hasChunk = true;
-            }
         }
         if (hasChunk) {
             return sendChunkedMessagesToConsumers(readType, entries, metadataArray);
@@ -676,7 +673,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
 
             EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
             EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
-            totalEntries += filterEntriesForConsumer(Optional.of(metadataArray), start,
+            totalEntries += filterEntriesForConsumer(metadataArray, start,
                     entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
                     readType == ReadType.Replay, c);