You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/25 04:47:37 UTC
[pulsar] 08/11: [Broker] Handle NPE when full key range isn't
covered with active consumers (#11749)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9287e873df8f13f9ac88846eaa49dd411a8ebc4a
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Mon Aug 23 21:15:14 2021 +0300
[Broker] Handle NPE when full key range isn't covered with active consumers (#11749)
(cherry picked from commit 8027ab4e8763486de16d4b2f850b234b70a16b27)
---
.../PersistentStickyKeyDispatcherMultipleConsumers.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index c23b360..d4d64e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -175,8 +175,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
for (Entry entry : entries) {
int stickyKeyHash = getStickyKeyHash(entry);
Consumer c = selector.select(stickyKeyHash);
- groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry);
- consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash);
+ if (c != null) {
+ groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry);
+ consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash);
+ } else {
+ entry.release();
+ }
}
AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());