You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/12/16 17:30:00 UTC

[pulsar] branch master updated: fix npe (#8969)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 19242a6  fix npe (#8969)
19242a6 is described below

commit 19242a63e8890c790690646be7578f8d1cb25ed2
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu Dec 17 01:29:26 2020 +0800

    fix npe (#8969)
---
 ...istentStickyKeyDispatcherMultipleConsumers.java |  2 +-
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 29 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 11b6f9f..e28ba59 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -175,7 +175,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             Consumer consumer = current.getKey();
             List<Entry> entriesWithSameKey = current.getValue();
             int entriesWithSameKeyCount = entriesWithSameKey.size();
-            final int availablePermits = Math.max(consumer.getAvailablePermits(), 0);
+            final int availablePermits = consumer == null ? 0 : Math.max(consumer.getAvailablePermits(), 0);
             int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits);
             int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey,
                     maxMessagesForC, readType);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index c281400..a902ac2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -166,6 +166,35 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
         Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
     }
 
+    @Test(timeOut = 10000)
+    public void testSendMessage() {
+        KeySharedMeta keySharedMeta = KeySharedMeta.newBuilder().setKeySharedMode(PulsarApi.KeySharedMode.STICKY).build();
+        PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
+                topicMock, cursorMock, subscriptionMock, configMock, keySharedMeta);
+        try {
+            PulsarApi.IntRange.newBuilder().setStart(0).setEnd(9).build();
+            keySharedMeta = PulsarApi.KeySharedMeta.newBuilder().setKeySharedMode(PulsarApi.KeySharedMode.STICKY)
+                    .addHashRanges(PulsarApi.IntRange.newBuilder().setStart(0).setEnd(9).build()).build();
+            Consumer consumerMock = mock(Consumer.class);
+            doReturn(keySharedMeta).when(consumerMock).getKeySharedMeta();
+            persistentDispatcher.addConsumer(consumerMock);
+            persistentDispatcher.consumerFlow(consumerMock, 1000);
+        } catch (Exception e) {
+            fail("Failed to add mock consumer", e);
+        }
+
+        List<Entry> entries = new ArrayList<>();
+        entries.add(EntryImpl.create(1, 1, createMessage("message1", 1)));
+        entries.add(EntryImpl.create(1, 2, createMessage("message2", 2)));
+
+        try {
+            //Should success,see issue #8960
+            persistentDispatcher.readEntriesComplete(entries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+        } catch (Exception e) {
+            fail("Failed to readEntriesComplete.", e);
+        }
+    }
+
     @Test
     public void testSkipRedeliverTemporally() {
         final Consumer slowConsumerMock = mock(Consumer.class);