You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/06/08 14:07:59 UTC

[pulsar] branch master updated: [cleanup][broker] Remove redundant `messagesForC` check in multi-consumer dispatcher (#15969)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 298a573295f [cleanup][broker] Remove redundant `messagesForC` check in multi-consumer dispatcher (#15969)
298a573295f is described below

commit 298a573295f845e46f8a55cee366b6db63e997c2
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Jun 8 22:07:49 2022 +0800

    [cleanup][broker] Remove redundant `messagesForC` check in multi-consumer dispatcher (#15969)
    
    ### Motivation
    
    In `PersistentDispatcherMultipleConsumers#sendMessagesToConsumers`, it
    checks `messagesForC > 0` while `messagesForC` is always greater than 0.
    
    ### Modifications
    
    Remove the `if (messagesForC > 0)` check. In addition, call
    `entries.subList(start, end)` before handling the replace case to avoid
    `subList` being called twice.
---
 .../PersistentDispatcherMultipleConsumers.java     | 65 +++++++++++-----------
 1 file changed, 32 insertions(+), 33 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 872a8d6ab60..2438f9ab8ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -557,41 +557,40 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                     serviceConfig.getDispatcherMaxRoundRobinBatchSize());
             messagesForC = Math.max(messagesForC / avgBatchSizePerMsg, 1);
 
-            if (messagesForC > 0) {
-                int end = Math.min(start + messagesForC, entries.size());
-                // remove positions first from replay list first : sendMessages recycles entries
-                if (readType == ReadType.Replay) {
-                    entries.subList(start, end).forEach(entry -> {
-                        redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId());
-                    });
-                }
+            int end = Math.min(start + messagesForC, entries.size());
+            List<Entry> entriesForThisConsumer = entries.subList(start, end);
 
-                SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
-                List<Entry> entriesForThisConsumer = entries.subList(start, end);
-
-                EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
-                EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
-                totalEntries += filterEntriesForConsumer(Optional.of(metadataArray), start,
-                        entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
-                        readType == ReadType.Replay, c);
-
-                c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
-                        sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
-
-                int msgSent = sendMessageInfo.getTotalMessages();
-                remainingMessages -= msgSent;
-                start += messagesForC;
-                entriesToDispatch -= messagesForC;
-                TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
-                        -(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
-                if (log.isDebugEnabled()){
-                    log.debug("[{}] Added -({} minus {}) permits to TOTAL_AVAILABLE_PERMITS_UPDATER in "
-                                    + "PersistentDispatcherMultipleConsumers",
-                            name, msgSent, batchIndexesAcks.getTotalAckedIndexCount());
-                }
-                totalMessagesSent += sendMessageInfo.getTotalMessages();
-                totalBytesSent += sendMessageInfo.getTotalBytes();
+            // remove positions first from replay list first : sendMessages recycles entries
+            if (readType == ReadType.Replay) {
+                entriesForThisConsumer.forEach(entry -> {
+                    redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId());
+                });
+            }
+
+            SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
+
+            EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
+            EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
+            totalEntries += filterEntriesForConsumer(Optional.of(metadataArray), start,
+                    entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
+                    readType == ReadType.Replay, c);
+
+            c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
+                    sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
+
+            int msgSent = sendMessageInfo.getTotalMessages();
+            remainingMessages -= msgSent;
+            start += messagesForC;
+            entriesToDispatch -= messagesForC;
+            TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
+                    -(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
+            if (log.isDebugEnabled()){
+                log.debug("[{}] Added -({} minus {}) permits to TOTAL_AVAILABLE_PERMITS_UPDATER in "
+                                + "PersistentDispatcherMultipleConsumers",
+                        name, msgSent, batchIndexesAcks.getTotalAckedIndexCount());
             }
+            totalMessagesSent += sendMessageInfo.getTotalMessages();
+            totalBytesSent += sendMessageInfo.getTotalBytes();
         }
 
         // acquire message-dispatch permits for already delivered messages