You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/06/08 08:27:59 UTC
[pulsar] branch master updated: [refactor][broker] Remove EntryWrapper usages and use MessageMetadata instead (#15967)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c33b12d0134 [refactor][broker] Remove EntryWrapper usages and use MessageMetadata instead (#15967)
c33b12d0134 is described below
commit c33b12d01340d9a5c63199572f888bd7207e8455
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Jun 8 16:27:51 2022 +0800
[refactor][broker] Remove EntryWrapper usages and use MessageMetadata instead (#15967)
### Motivation
https://github.com/apache/pulsar/pull/7266 introduced the `EntryWrapper`
to store the `Entry` object and the associated `MessageMetadata` if it
exists. However, the `getEntry` field of `EntryWrapper` is never used.
There is no need to allocate memory for `EntryWrapper`, even if it's
allocated from the recycler pool.
### Modifications
- Calculate the remaining messages without creating `EntryWrapper`
object, just iterate over the parsed message metadata list.
- Pass an optional `MessageMetadata` array to `filterEntriesForConsumer`
and add the JavaDocs for these two parameters.
After that, Remove unused `EntryWrapper` and `updateEntryWrapperWithMetadata`.
This PR uses functional programming style to make code more simple and clear.
---
.../broker/service/AbstractBaseDispatcher.java | 48 +++++-----------
.../apache/pulsar/broker/service/EntryWrapper.java | 67 ----------------------
.../PersistentDispatcherMultipleConsumers.java | 22 +++----
3 files changed, 26 insertions(+), 111 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 7fc40038c7c..e8537fe606d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -76,29 +76,6 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
}
}
- /**
- * Update Entries with the metadata of each entry.
- *
- * @param entries
- * @return
- */
- protected int updateEntryWrapperWithMetadata(EntryWrapper[] entryWrappers, List<Entry> entries) {
- int totalMessages = 0;
- for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
- Entry entry = entries.get(i);
- if (entry == null) {
- continue;
- }
-
- ByteBuf metadataAndPayload = entry.getDataBuffer();
- MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1);
- EntryWrapper entryWrapper = EntryWrapper.get(entry, msgMetadata);
- entryWrappers[i] = entryWrapper;
- int batchSize = msgMetadata.getNumMessagesInBatch();
- totalMessages += batchSize;
- }
- return totalMessages;
- }
/**
* Filter messages that are being sent to a consumers.
@@ -127,7 +104,17 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
isReplayRead, consumer);
}
- public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int entryWrapperOffset,
+
+ /**
+ * Filter entries with prefetched message metadata range so that there is no need to peek metadata from Entry.
+ *
+ * @param optMetadataArray the optional message metadata array
+ * @param startOffset the index in `optMetadataArray` of the first Entry's message metadata
+ *
+ * @see AbstractBaseDispatcher#filterEntriesForConsumer(List, EntryBatchSizes, SendMessageInfo,
+ * EntryBatchIndexesAcks, ManagedCursor, boolean, Consumer)
+ */
+ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray, int startOffset,
List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo,
EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead, Consumer consumer) {
int totalMessages = 0;
@@ -142,17 +129,12 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
continue;
}
ByteBuf metadataAndPayload = entry.getDataBuffer();
- int entryWrapperIndex = i + entryWrapperOffset;
- MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[entryWrapperIndex] != null
- ? entryWrapper.get()[entryWrapperIndex].getMetadata()
- : null;
- msgMetadata = msgMetadata == null
- ? Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1)
- : msgMetadata;
- EntryFilter.FilterResult filterResult = EntryFilter.FilterResult.ACCEPT;
+ final int metadataIndex = i + startOffset;
+ final MessageMetadata msgMetadata = optMetadataArray.map(metadataArray -> metadataArray[metadataIndex])
+ .orElse(Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1));
if (CollectionUtils.isNotEmpty(entryFilters)) {
fillContext(filterContext, msgMetadata, subscription, consumer);
- filterResult = getFilterResult(filterContext, entry, entryFilters);
+ EntryFilter.FilterResult filterResult = getFilterResult(filterContext, entry, entryFilters);
if (filterResult == EntryFilter.FilterResult.REJECT) {
entriesToFiltered.add(entry.getPosition());
entries.set(i, null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryWrapper.java
deleted file mode 100644
index a1b4e58a747..00000000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryWrapper.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service;
-
-import io.netty.util.Recycler;
-import org.apache.bookkeeper.mledger.Entry;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-
-public class EntryWrapper {
- private Entry entry = null;
- private MessageMetadata metadata = new MessageMetadata();
- private boolean hasMetadata = false;
-
- public static EntryWrapper get(Entry entry, MessageMetadata metadata) {
- EntryWrapper entryWrapper = RECYCLER.get();
- entryWrapper.entry = entry;
- if (metadata != null) {
- entryWrapper.hasMetadata = true;
- entryWrapper.metadata.copyFrom(metadata);
- }
- entryWrapper.metadata.copyFrom(metadata);
- return entryWrapper;
- }
-
- private EntryWrapper(Recycler.Handle<EntryWrapper> handle) {
- this.handle = handle;
- }
-
- public Entry getEntry() {
- return entry;
- }
-
- public MessageMetadata getMetadata() {
- return hasMetadata ? metadata : null;
- }
-
- private final Recycler.Handle<EntryWrapper> handle;
- private static final Recycler<EntryWrapper> RECYCLER = new Recycler<EntryWrapper>() {
- @Override
- protected EntryWrapper newObject(Handle<EntryWrapper> handle) {
- return new EntryWrapper(handle);
- }
- };
-
- public void recycle() {
- entry = null;
- hasMetadata = false;
- metadata.clear();
- handle.recycle(this);
- }
-}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index a874706433e..872a8d6ab60 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -24,12 +24,14 @@ import com.google.common.collect.Range;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.stream.Stream;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -47,7 +49,6 @@ import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
-import org.apache.pulsar.broker.service.EntryWrapper;
import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
@@ -59,6 +60,7 @@ import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferEx
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -512,8 +514,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
readMoreEntries();
return;
}
- EntryWrapper[] entryWrappers = new EntryWrapper[entries.size()];
- int remainingMessages = updateEntryWrapperWithMetadata(entryWrappers, entries);
+ final MessageMetadata[] metadataArray = entries.stream()
+ .map(entry -> Commands.peekMessageMetadata(entry.getDataBuffer(), subscription.toString(), -1))
+ .toArray(MessageMetadata[]::new);
+ int remainingMessages = Stream.of(metadataArray).filter(Objects::nonNull)
+ .map(MessageMetadata::getNumMessagesInBatch)
+ .reduce(0, Integer::sum);
+
int start = 0;
long totalMessagesSent = 0;
long totalBytesSent = 0;
@@ -564,7 +571,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
- totalEntries += filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start,
+ totalEntries += filterEntriesForConsumer(Optional.of(metadataArray), start,
entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
readType == ReadType.Replay, c);
@@ -587,13 +594,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
}
- // release entry-wrapper
- for (EntryWrapper entry : entryWrappers) {
- if (entry != null) {
- entry.recycle();
- }
- }
-
// acquire message-dispatch permits for already delivered messages
long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {