You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:53:13 UTC

[pulsar] 07/10: Fix ArrayIndexOutOfBoundsException in batch index ack. (#7483)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 76c0939c09b8baa25924b52eb2078a6aa1112728
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jul 9 12:58:50 2020 +0800

    Fix ArrayIndexOutOfBoundsException in batch index ack. (#7483)
    
    
    (cherry picked from commit beb9e3be60513bdfbd0e412a68747b97714af1d7)
---
 .../org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java  | 9 +++++++--
 .../persistent/PersistentDispatcherMultipleConsumers.java        | 2 +-
 .../persistent/PersistentDispatcherSingleActiveConsumer.java     | 2 +-
 .../PersistentStickyKeyDispatcherMultipleConsumers.java          | 2 +-
 4 files changed, 10 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java
index e41a2909..175f5ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java
@@ -53,8 +53,13 @@ public class EntryBatchIndexesAcks {
         handle.recycle(this);
     }
 
-    public static EntryBatchIndexesAcks get() {
-        return RECYCLER.get();
+    public static EntryBatchIndexesAcks get(int entriesListSize) {
+        EntryBatchIndexesAcks ebi = RECYCLER.get();
+
+        if (ebi.indexesAcks.length < entriesListSize) {
+            ebi.indexesAcks = new Pair[entriesListSize];
+        }
+        return ebi;
     }
 
     private EntryBatchIndexesAcks(Recycler.Handle<EntryBatchIndexesAcks> handle) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index b8255ba..c24e762 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -525,7 +525,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 List<Entry> entriesForThisConsumer = entries.subList(start, start + messagesForC);
 
                 EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
-                EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get();
+                EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
                 filterEntriesForConsumer(entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);
 
                 c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 5bb7629..58a63bd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -237,7 +237,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
         } else {
             EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
             SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
-            EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get();
+            EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entries.size());
             filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);
 
             int totalMessages = sendMessageInfo.getTotalMessages();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 420552c..35d645a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -188,7 +188,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
                 SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
                 EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC);
-                EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get();
+                EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(messagesForC);
                 filterEntriesForConsumer(entriesWithSameKey, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);
 
                 consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),