You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/06/08 08:27:59 UTC

[pulsar] branch master updated: [refactor][broker] Remove EntryWrapper usages and use MessageMetadata instead (#15967)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c33b12d0134 [refactor][broker] Remove EntryWrapper usages and use MessageMetadata instead (#15967)
c33b12d0134 is described below

commit c33b12d01340d9a5c63199572f888bd7207e8455
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Jun 8 16:27:51 2022 +0800

    [refactor][broker] Remove EntryWrapper usages and use MessageMetadata instead (#15967)
    
    ### Motivation
    
    https://github.com/apache/pulsar/pull/7266 introduced the `EntryWrapper`
    to store the `Entry` object and the associated `MessageMetadata` if it
    exists. However, the `getEntry` field of `EntryWrapper` is never used.
    There is no need to allocate memory for `EntryWrapper`, even if it's
    allocated from the recycler pool.
    
    ### Modifications
    
    - Calculate the remaining messages without creating `EntryWrapper`
      object, just iterate over the parsed message metadata list.
    - Pass an optional `MessageMetadata` array to `filterEntriesForConsumer`
      and add the JavaDocs for these two parameters.
    
    After that, Remove unused `EntryWrapper` and `updateEntryWrapperWithMetadata`.
    This PR uses functional programming style to make code more simple and clear.
---
 .../broker/service/AbstractBaseDispatcher.java     | 48 +++++-----------
 .../apache/pulsar/broker/service/EntryWrapper.java | 67 ----------------------
 .../PersistentDispatcherMultipleConsumers.java     | 22 +++----
 3 files changed, 26 insertions(+), 111 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 7fc40038c7c..e8537fe606d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -76,29 +76,6 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
         }
     }
 
-    /**
-     * Update Entries with the metadata of each entry.
-     *
-     * @param entries
-     * @return
-     */
-    protected int updateEntryWrapperWithMetadata(EntryWrapper[] entryWrappers, List<Entry> entries) {
-        int totalMessages = 0;
-        for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
-            Entry entry = entries.get(i);
-            if (entry == null) {
-                continue;
-            }
-
-            ByteBuf metadataAndPayload = entry.getDataBuffer();
-            MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1);
-            EntryWrapper entryWrapper = EntryWrapper.get(entry, msgMetadata);
-            entryWrappers[i] = entryWrapper;
-            int batchSize = msgMetadata.getNumMessagesInBatch();
-            totalMessages += batchSize;
-        }
-        return totalMessages;
-    }
 
     /**
      * Filter messages that are being sent to a consumers.
@@ -127,7 +104,17 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
                 isReplayRead, consumer);
     }
 
-    public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int entryWrapperOffset,
+
+    /**
+     * Filter entries with prefetched message metadata range so that there is no need to peek metadata from Entry.
+     *
+     * @param optMetadataArray the optional message metadata array
+     * @param startOffset the index in `optMetadataArray` of the first Entry's message metadata
+     *
+     * @see AbstractBaseDispatcher#filterEntriesForConsumer(List, EntryBatchSizes, SendMessageInfo,
+     *   EntryBatchIndexesAcks, ManagedCursor, boolean, Consumer)
+     */
+    public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray, int startOffset,
              List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo,
              EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead, Consumer consumer) {
         int totalMessages = 0;
@@ -142,17 +129,12 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
                 continue;
             }
             ByteBuf metadataAndPayload = entry.getDataBuffer();
-            int entryWrapperIndex = i + entryWrapperOffset;
-            MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[entryWrapperIndex] != null
-                    ? entryWrapper.get()[entryWrapperIndex].getMetadata()
-                    : null;
-            msgMetadata = msgMetadata == null
-                    ? Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1)
-                    : msgMetadata;
-            EntryFilter.FilterResult filterResult = EntryFilter.FilterResult.ACCEPT;
+            final int metadataIndex = i + startOffset;
+            final MessageMetadata msgMetadata = optMetadataArray.map(metadataArray -> metadataArray[metadataIndex])
+                    .orElse(Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1));
             if (CollectionUtils.isNotEmpty(entryFilters)) {
                 fillContext(filterContext, msgMetadata, subscription, consumer);
-                filterResult = getFilterResult(filterContext, entry, entryFilters);
+                EntryFilter.FilterResult filterResult = getFilterResult(filterContext, entry, entryFilters);
                 if (filterResult == EntryFilter.FilterResult.REJECT) {
                     entriesToFiltered.add(entry.getPosition());
                     entries.set(i, null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryWrapper.java
deleted file mode 100644
index a1b4e58a747..00000000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryWrapper.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service;
-
-import io.netty.util.Recycler;
-import org.apache.bookkeeper.mledger.Entry;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-
-public class EntryWrapper {
-    private Entry entry = null;
-    private MessageMetadata metadata = new MessageMetadata();
-    private boolean hasMetadata = false;
-
-    public static EntryWrapper get(Entry entry, MessageMetadata metadata) {
-        EntryWrapper entryWrapper = RECYCLER.get();
-        entryWrapper.entry = entry;
-        if (metadata != null) {
-            entryWrapper.hasMetadata = true;
-            entryWrapper.metadata.copyFrom(metadata);
-        }
-        entryWrapper.metadata.copyFrom(metadata);
-        return entryWrapper;
-    }
-
-    private EntryWrapper(Recycler.Handle<EntryWrapper> handle) {
-        this.handle = handle;
-    }
-
-    public Entry getEntry() {
-        return entry;
-    }
-
-    public MessageMetadata getMetadata() {
-        return hasMetadata ? metadata : null;
-    }
-
-    private final Recycler.Handle<EntryWrapper> handle;
-    private static final Recycler<EntryWrapper> RECYCLER = new Recycler<EntryWrapper>() {
-        @Override
-        protected EntryWrapper newObject(Handle<EntryWrapper> handle) {
-            return new EntryWrapper(handle);
-        }
-    };
-
-    public void recycle() {
-        entry = null;
-        hasMetadata = false;
-        metadata.clear();
-        handle.recycle(this);
-    }
-}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index a874706433e..872a8d6ab60 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -24,12 +24,14 @@ import com.google.common.collect.Range;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.stream.Stream;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -47,7 +49,6 @@ import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
-import org.apache.pulsar.broker.service.EntryWrapper;
 import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker;
 import org.apache.pulsar.broker.service.RedeliveryTracker;
 import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
@@ -59,6 +60,7 @@ import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferEx
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -512,8 +514,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             readMoreEntries();
             return;
         }
-        EntryWrapper[] entryWrappers = new EntryWrapper[entries.size()];
-        int remainingMessages = updateEntryWrapperWithMetadata(entryWrappers, entries);
+        final MessageMetadata[] metadataArray = entries.stream()
+                .map(entry -> Commands.peekMessageMetadata(entry.getDataBuffer(), subscription.toString(), -1))
+                .toArray(MessageMetadata[]::new);
+        int remainingMessages = Stream.of(metadataArray).filter(Objects::nonNull)
+                .map(MessageMetadata::getNumMessagesInBatch)
+                .reduce(0, Integer::sum);
+
         int start = 0;
         long totalMessagesSent = 0;
         long totalBytesSent = 0;
@@ -564,7 +571,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
 
                 EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
                 EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
-                totalEntries += filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start,
+                totalEntries += filterEntriesForConsumer(Optional.of(metadataArray), start,
                         entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
                         readType == ReadType.Replay, c);
 
@@ -587,13 +594,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             }
         }
 
-        // release entry-wrapper
-        for (EntryWrapper entry : entryWrappers) {
-            if (entry != null) {
-                entry.recycle();
-            }
-        }
-
         // acquire message-dispatch permits for already delivered messages
         long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
         if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {