You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/06/04 20:31:14 UTC

[pulsar] branch master updated: Fix the out of index issue when dispatch messages based on the avgBatchSizePerMsg. (#10828)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e3cfbf8  Fix the out of index issue when dispatch messages based on the avgBatchSizePerMsg. (#10828)
e3cfbf8 is described below

commit e3cfbf8fadb98bc83ddab057239eb024a00b5f6d
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat Jun 5 04:30:18 2021 +0800

    Fix the out of index issue when dispatch messages based on the avgBatchSizePerMsg. (#10828)
    
    Using the avgBatchSizePerMsg to calculate the entries might over the remaining entries
    The fix is use Math.min(start + messagesForC, entries.size()) to avoid out of index exception
---
 .../service/persistent/PersistentDispatcherMultipleConsumers.java   | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 9b8616b..6756c4a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -519,16 +519,16 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             messagesForC = Math.max(messagesForC / avgBatchSizePerMsg, 1);
 
             if (messagesForC > 0) {
-
+                int end = Math.min(start + messagesForC, entries.size());
                 // remove positions first from replay list first : sendMessages recycles entries
                 if (readType == ReadType.Replay) {
-                    entries.subList(start, start + messagesForC).forEach(entry -> {
+                    entries.subList(start, end).forEach(entry -> {
                         messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId());
                     });
                 }
 
                 SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
-                List<Entry> entriesForThisConsumer = entries.subList(start, start + messagesForC);
+                List<Entry> entriesForThisConsumer = entries.subList(start, end);
 
                 EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
                 EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());