You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:53:13 UTC
[pulsar] 07/10: Fix ArrayIndexOutOfBoundsException in batch index
ack. (#7483)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 76c0939c09b8baa25924b52eb2078a6aa1112728
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jul 9 12:58:50 2020 +0800
Fix ArrayIndexOutOfBoundsException in batch index ack. (#7483)
(cherry picked from commit beb9e3be60513bdfbd0e412a68747b97714af1d7)
---
.../org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java | 9 +++++++--
.../persistent/PersistentDispatcherMultipleConsumers.java | 2 +-
.../persistent/PersistentDispatcherSingleActiveConsumer.java | 2 +-
.../PersistentStickyKeyDispatcherMultipleConsumers.java | 2 +-
4 files changed, 10 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java
index e41a2909..175f5ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java
@@ -53,8 +53,13 @@ public class EntryBatchIndexesAcks {
handle.recycle(this);
}
- public static EntryBatchIndexesAcks get() {
- return RECYCLER.get();
+ public static EntryBatchIndexesAcks get(int entriesListSize) {
+ EntryBatchIndexesAcks ebi = RECYCLER.get();
+
+ if (ebi.indexesAcks.length < entriesListSize) {
+ ebi.indexesAcks = new Pair[entriesListSize];
+ }
+ return ebi;
}
private EntryBatchIndexesAcks(Recycler.Handle<EntryBatchIndexesAcks> handle) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index b8255ba..c24e762 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -525,7 +525,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
List<Entry> entriesForThisConsumer = entries.subList(start, start + messagesForC);
EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
- EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get();
+ EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
filterEntriesForConsumer(entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);
c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 5bb7629..58a63bd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -237,7 +237,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
} else {
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
- EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get();
+ EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entries.size());
filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);
int totalMessages = sendMessageInfo.getTotalMessages();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 420552c..35d645a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -188,7 +188,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC);
- EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get();
+ EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(messagesForC);
filterEntriesForConsumer(entriesWithSameKey, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);
consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),