You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/02/08 07:57:45 UTC
[pulsar] branch branch-2.11 updated: [cleanup][broker] Simplify extract entryMetadata code in filterEntriesForConsumer (#18729)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 644a2c5f8d2 [cleanup][broker] Simplify extract entryMetadata code in filterEntriesForConsumer (#18729)
644a2c5f8d2 is described below
commit 644a2c5f8d20d6889d7f27c8fbe07852ea00f5bd
Author: lifepuzzlefun <wj...@163.com>
AuthorDate: Mon Dec 12 14:18:15 2022 +0800
[cleanup][broker] Simplify extract entryMetadata code in filterEntriesForConsumer (#18729)
### Motivation
origin extract entry metadata logic is based on `Optional.map.orElseGet` which can be simplified by if condition and also has better performance on hot code path.
### Modifications
1. use if null check replace Optional code.
2. remove duplicate hasChunk check logic in `PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers`
---
.../broker/service/AbstractBaseDispatcher.java | 29 ++++++++++++++--------
.../PersistentDispatcherMultipleConsumers.java | 5 +---
2 files changed, 19 insertions(+), 15 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 677b3a84a4c..1157ae65558 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import lombok.extern.slf4j.Slf4j;
@@ -47,6 +46,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
+import org.checkerframework.checker.nullness.qual.Nullable;
@Slf4j
public abstract class AbstractBaseDispatcher extends EntryFilterSupport implements Dispatcher {
@@ -94,22 +94,25 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
public int filterEntriesForConsumer(List<? extends Entry> entries, EntryBatchSizes batchSizes,
SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks,
ManagedCursor cursor, boolean isReplayRead, Consumer consumer) {
- return filterEntriesForConsumer(Optional.empty(), 0, entries, batchSizes, sendMessageInfo, indexesAcks, cursor,
+ return filterEntriesForConsumer(null, 0, entries, batchSizes,
+ sendMessageInfo, indexesAcks, cursor,
isReplayRead, consumer);
}
/**
* Filter entries with prefetched message metadata range so that there is no need to peek metadata from Entry.
*
- * @param optMetadataArray the optional message metadata array
+ * @param metadataArray the optional message metadata array. need check if null pass.
* @param startOffset the index in `optMetadataArray` of the first Entry's message metadata
*
* @see AbstractBaseDispatcher#filterEntriesForConsumer(List, EntryBatchSizes, SendMessageInfo,
* EntryBatchIndexesAcks, ManagedCursor, boolean, Consumer)
*/
- public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray, int startOffset,
- List<? extends Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo,
- EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead, Consumer consumer) {
+ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, int startOffset,
+ List<? extends Entry> entries, EntryBatchSizes batchSizes,
+ SendMessageInfo sendMessageInfo,
+ EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor,
+ boolean isReplayRead, Consumer consumer) {
int totalMessages = 0;
long totalBytes = 0;
int totalChunkedMessages = 0;
@@ -124,11 +127,15 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
}
ByteBuf metadataAndPayload = entry.getDataBuffer();
final int metadataIndex = i + startOffset;
- final MessageMetadata msgMetadata = optMetadataArray.map(metadataArray -> metadataArray[metadataIndex])
- .orElseGet(() -> (entry instanceof EntryAndMetadata)
- ? ((EntryAndMetadata) entry).getMetadata()
- : Commands.peekAndCopyMessageMetadata(metadataAndPayload, subscription.toString(), -1)
- );
+
+ MessageMetadata msgMetadata;
+ if (metadataArray != null) {
+ msgMetadata = metadataArray[metadataIndex];
+ } else if (entry instanceof EntryAndMetadata) {
+ msgMetadata = ((EntryAndMetadata) entry).getMetadata();
+ } else {
+ msgMetadata = Commands.peekAndCopyMessageMetadata(metadataAndPayload, subscription.toString(), -1);
+ }
int entryMsgCnt = msgMetadata == null ? 1 : msgMetadata.getNumMessagesInBatch();
if (hasFilter) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 6a964f20c37..20c1173286e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -613,9 +613,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
}
metadataArray[i] = metadata;
- if (!hasChunk && metadataArray[i] != null && metadataArray[i].hasUuid()) {
- hasChunk = true;
- }
}
if (hasChunk) {
return sendChunkedMessagesToConsumers(readType, entries, metadataArray);
@@ -676,7 +673,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
- totalEntries += filterEntriesForConsumer(Optional.of(metadataArray), start,
+ totalEntries += filterEntriesForConsumer(metadataArray, start,
entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
readType == ReadType.Replay, c);