You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/25 04:47:37 UTC

[pulsar] 08/11: [Broker] Handle NPE when full key range isn't covered with active consumers (#11749)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9287e873df8f13f9ac88846eaa49dd411a8ebc4a
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Mon Aug 23 21:15:14 2021 +0300

    [Broker] Handle NPE when full key range isn't covered with active consumers (#11749)
    
    (cherry picked from commit 8027ab4e8763486de16d4b2f850b234b70a16b27)
---
 .../PersistentStickyKeyDispatcherMultipleConsumers.java           | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index c23b360..d4d64e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -175,8 +175,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         for (Entry entry : entries) {
             int stickyKeyHash = getStickyKeyHash(entry);
             Consumer c = selector.select(stickyKeyHash);
-            groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry);
-            consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash);
+            if (c != null) {
+                groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry);
+                consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash);
+            } else {
+                entry.release();
+            }
         }
 
         AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());