You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2023/02/25 12:24:47 UTC

[pulsar] branch branch-2.10 updated: [cleanup][broker] Simplify extract entryMetadata code in filterEntriesForConsumer (#18729)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new e4a1e675eee [cleanup][broker] Simplify extract entryMetadata code in filterEntriesForConsumer (#18729)
e4a1e675eee is described below

commit e4a1e675eee06e6cd55622eb217cae6fa97c225a
Author: lifepuzzlefun <wj...@163.com>
AuthorDate: Mon Dec 12 14:18:15 2022 +0800

    [cleanup][broker] Simplify extract entryMetadata code in filterEntriesForConsumer (#18729)
    
    origin extract entry metadata logic is based on `Optional.map.orElseGet` which can be simplified by if condition and also has better performance on hot code path.
    
    1. use if null check replace Optional code.
    2. remove duplicate hasChunk check logic in `PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers`
    
    (cherry picked from commit a1e3b80af804c8697d24b5be51f417650ec67112)
---
 .../broker/service/AbstractBaseDispatcher.java     | 26 ++++++++++++++++------
 .../PersistentDispatcherMultipleConsumers.java     |  2 +-
 2 files changed, 20 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index da6be55f8e1..6b9ddcc1162 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
@@ -52,6 +51,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 @Slf4j
 public abstract class AbstractBaseDispatcher implements Dispatcher {
@@ -128,13 +128,25 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
     public int filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes,
             SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks,
             ManagedCursor cursor, boolean isReplayRead, Consumer consumer) {
-        return filterEntriesForConsumer(Optional.empty(), 0, entries, batchSizes, sendMessageInfo, indexesAcks, cursor,
+        return filterEntriesForConsumer(null, 0, entries, batchSizes,
+                sendMessageInfo, indexesAcks, cursor,
                 isReplayRead, consumer);
     }
 
-    public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int entryWrapperOffset,
-             List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo,
-             EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead, Consumer consumer) {
+    /**
+     * Filter entries with prefetched message metadata range so that there is no need to peek metadata from Entry.
+     *
+     * @param entryWrapper the optional message metadata array. need check if null pass.
+     * @param entryWrapperOffset the index in `optMetadataArray` of the first Entry's message metadata
+     *
+     * @see AbstractBaseDispatcher#filterEntriesForConsumer(List, EntryBatchSizes, SendMessageInfo,
+     *   EntryBatchIndexesAcks, ManagedCursor, boolean, Consumer)
+     */
+    public int filterEntriesForConsumer(@Nullable EntryWrapper[] entryWrapper, int entryWrapperOffset,
+                                        List<? extends Entry> entries, EntryBatchSizes batchSizes,
+                                        SendMessageInfo sendMessageInfo,
+                                        EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor,
+                                        boolean isReplayRead, Consumer consumer) {
         int totalMessages = 0;
         long totalBytes = 0;
         int totalChunkedMessages = 0;
@@ -148,8 +160,8 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
             }
             ByteBuf metadataAndPayload = entry.getDataBuffer();
             int entryWrapperIndex = i + entryWrapperOffset;
-            MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[entryWrapperIndex] != null
-                    ? entryWrapper.get()[entryWrapperIndex].getMetadata()
+            MessageMetadata msgMetadata = entryWrapper != null  && entryWrapper[entryWrapperIndex] != null
+                    ? entryWrapper[entryWrapperIndex].getMetadata()
                     : null;
             msgMetadata = msgMetadata == null
                     ? Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index b4397635c41..f74ac857999 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -589,7 +589,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
 
                 EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
                 EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
-                totalEntries += filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start,
+                totalEntries += filterEntriesForConsumer(entryWrappers, start,
                         entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
                         readType == ReadType.Replay, c);