You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/06/10 07:12:37 UTC
[pulsar] branch master updated: [fix][broker] Avoid storing `MessageMetadata` instances returned by `peekMessageMetadata` (#15983)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 36690f5a7f7 [fix][broker] Avoid storing `MessageMetadata` instances returned by `peekMessageMetadata` (#15983)
36690f5a7f7 is described below
commit 36690f5a7f79fb744c1d61969f2a277cadd0e704
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Jun 10 15:12:31 2022 +0800
[fix][broker] Avoid storing `MessageMetadata` instances returned by `peekMessageMetadata` (#15983)
### Motivation
https://github.com/apache/pulsar/pull/15967 removed the `EntryWrapper`,
which holds an `Entry` instance that is never used. Instead, after the
refactoring, the `MessageMetadata` array is useless, see
https://github.com/apache/pulsar/blob/298a573295f845e46f8a55cee366b6db63e997c2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L517-L519
Each `MessageMetadata` instance in the array is returned by
`peekMessageMetadata`, whose returned value references a thread local
object `Commands#LOCAL_MESSAGE_METADATA`. It brings a problem that if
multiple entries were read, all `MessageMetadata` elements in the array
reference the same object.
However, accidentally, the wrong invocation of `Optional#orElse` saves
it. See
https://github.com/apache/pulsar/blob/298a573295f845e46f8a55cee366b6db63e997c2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L133-L134
Each time `peekMessageMetadata` is called, the thread local message
metadata will be updated. Unlike `orElseGet`, the expression in `orElse`
is always called no matter if the optional is empty.
This behavior change increases the invocations count of
`peekMessageMetadata` and the `metadataArray` cache became redundant.
### Modifications
- Use `orElseGet` instead of `orElse` in `AbstractBaseDispatcher`.
- Add a new static method `Commands#peekAndCopyMessageMetadata` that
returns a `MessageMetadata` instance allocated from heap memory.
- Call `peekAndCopyMessageMetadata` to cache all message metadata
instances in `PersistentDispatcherMultipleConsumers`.
### Verifying this change
It's hard to add tests. As I've explained before, #15967 only degrades the
performance and doesn't affect the correctness.
---
.../pulsar/broker/service/AbstractBaseDispatcher.java | 2 +-
.../PersistentDispatcherMultipleConsumers.java | 2 +-
.../org/apache/pulsar/common/protocol/Commands.java | 18 ++++++++++++++++++
3 files changed, 20 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index e8537fe606d..dc71bcf3501 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -131,7 +131,7 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
ByteBuf metadataAndPayload = entry.getDataBuffer();
final int metadataIndex = i + startOffset;
final MessageMetadata msgMetadata = optMetadataArray.map(metadataArray -> metadataArray[metadataIndex])
- .orElse(Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1));
+ .orElseGet(() -> Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1));
if (CollectionUtils.isNotEmpty(entryFilters)) {
fillContext(filterContext, msgMetadata, subscription, consumer);
EntryFilter.FilterResult filterResult = getFilterResult(filterContext, entry, entryFilters);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 2438f9ab8ac..78c0a996431 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -515,7 +515,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
return;
}
final MessageMetadata[] metadataArray = entries.stream()
- .map(entry -> Commands.peekMessageMetadata(entry.getDataBuffer(), subscription.toString(), -1))
+ .map(entry -> Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), subscription.toString(), -1))
.toArray(MessageMetadata[]::new);
int remainingMessages = Stream.of(metadataArray).filter(Objects::nonNull)
.map(MessageMetadata::getNumMessagesInBatch)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 285a12321c1..46c22375b77 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1791,6 +1791,24 @@ public class Commands {
}
}
+ /**
+ * Peek the message metadata from the buffer and return a deep copy of the metadata.
+ *
+ * If you want to hold multiple {@link MessageMetadata} instances from multiple buffers, you must call this method
+ * rather than {@link Commands#peekMessageMetadata(ByteBuf, String, long)}, which returns a thread local reference,
+ * see {@link Commands#LOCAL_MESSAGE_METADATA}.
+ */
+ public static MessageMetadata peekAndCopyMessageMetadata(
+ ByteBuf metadataAndPayload, String subscription, long consumerId) {
+ final MessageMetadata localMetadata = peekMessageMetadata(metadataAndPayload, subscription, consumerId);
+ if (localMetadata == null) {
+ return null;
+ }
+ final MessageMetadata metadata = new MessageMetadata();
+ metadata.copyFrom(localMetadata);
+ return metadata;
+ }
+
private static final byte[] NONE_KEY = "NONE_KEY".getBytes(StandardCharsets.UTF_8);
public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) {
try {