You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/06/08 14:07:59 UTC
[pulsar] branch master updated: [cleanup][broker] Remove redundant `messagesForC` check in multi-consumer dispatcher (#15969)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 298a573295f [cleanup][broker] Remove redundant `messagesForC` check in multi-consumer dispatcher (#15969)
298a573295f is described below
commit 298a573295f845e46f8a55cee366b6db63e997c2
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Jun 8 22:07:49 2022 +0800
[cleanup][broker] Remove redundant `messagesForC` check in multi-consumer dispatcher (#15969)
### Motivation
In `PersistentDispatcherMultipleConsumers#sendMessagesToConsumers`, it
checks `messagesForC > 0` while `messagesForC` is always greater than 0.
### Modifications
Remove the `if (messagesForC > 0)` check. In addition, call
`entries.subList(start, end)` before handling the replace case to avoid
`subList` being called twice.
---
.../PersistentDispatcherMultipleConsumers.java | 65 +++++++++++-----------
1 file changed, 32 insertions(+), 33 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 872a8d6ab60..2438f9ab8ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -557,41 +557,40 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
serviceConfig.getDispatcherMaxRoundRobinBatchSize());
messagesForC = Math.max(messagesForC / avgBatchSizePerMsg, 1);
- if (messagesForC > 0) {
- int end = Math.min(start + messagesForC, entries.size());
- // remove positions first from replay list first : sendMessages recycles entries
- if (readType == ReadType.Replay) {
- entries.subList(start, end).forEach(entry -> {
- redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId());
- });
- }
+ int end = Math.min(start + messagesForC, entries.size());
+ List<Entry> entriesForThisConsumer = entries.subList(start, end);
- SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
- List<Entry> entriesForThisConsumer = entries.subList(start, end);
-
- EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
- EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
- totalEntries += filterEntriesForConsumer(Optional.of(metadataArray), start,
- entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
- readType == ReadType.Replay, c);
-
- c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
- sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
-
- int msgSent = sendMessageInfo.getTotalMessages();
- remainingMessages -= msgSent;
- start += messagesForC;
- entriesToDispatch -= messagesForC;
- TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
- -(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
- if (log.isDebugEnabled()){
- log.debug("[{}] Added -({} minus {}) permits to TOTAL_AVAILABLE_PERMITS_UPDATER in "
- + "PersistentDispatcherMultipleConsumers",
- name, msgSent, batchIndexesAcks.getTotalAckedIndexCount());
- }
- totalMessagesSent += sendMessageInfo.getTotalMessages();
- totalBytesSent += sendMessageInfo.getTotalBytes();
+ // remove positions first from replay list first : sendMessages recycles entries
+ if (readType == ReadType.Replay) {
+ entriesForThisConsumer.forEach(entry -> {
+ redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId());
+ });
+ }
+
+ SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
+
+ EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
+ EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
+ totalEntries += filterEntriesForConsumer(Optional.of(metadataArray), start,
+ entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
+ readType == ReadType.Replay, c);
+
+ c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
+ sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
+
+ int msgSent = sendMessageInfo.getTotalMessages();
+ remainingMessages -= msgSent;
+ start += messagesForC;
+ entriesToDispatch -= messagesForC;
+ TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
+ -(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
+ if (log.isDebugEnabled()){
+ log.debug("[{}] Added -({} minus {}) permits to TOTAL_AVAILABLE_PERMITS_UPDATER in "
+ + "PersistentDispatcherMultipleConsumers",
+ name, msgSent, batchIndexesAcks.getTotalAckedIndexCount());
}
+ totalMessagesSent += sendMessageInfo.getTotalMessages();
+ totalBytesSent += sendMessageInfo.getTotalBytes();
}
// acquire message-dispatch permits for already delivered messages