You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/12/16 17:30:00 UTC
[pulsar] branch master updated: fix npe (#8969)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 19242a6 fix npe (#8969)
19242a6 is described below
commit 19242a63e8890c790690646be7578f8d1cb25ed2
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu Dec 17 01:29:26 2020 +0800
fix npe (#8969)
---
...istentStickyKeyDispatcherMultipleConsumers.java | 2 +-
...ntStickyKeyDispatcherMultipleConsumersTest.java | 29 ++++++++++++++++++++++
2 files changed, 30 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 11b6f9f..e28ba59 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -175,7 +175,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
Consumer consumer = current.getKey();
List<Entry> entriesWithSameKey = current.getValue();
int entriesWithSameKeyCount = entriesWithSameKey.size();
- final int availablePermits = Math.max(consumer.getAvailablePermits(), 0);
+ final int availablePermits = consumer == null ? 0 : Math.max(consumer.getAvailablePermits(), 0);
int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits);
int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey,
maxMessagesForC, readType);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index c281400..a902ac2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -166,6 +166,35 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
}
+ @Test(timeOut = 10000)
+ public void testSendMessage() {
+ KeySharedMeta keySharedMeta = KeySharedMeta.newBuilder().setKeySharedMode(PulsarApi.KeySharedMode.STICKY).build();
+ PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
+ topicMock, cursorMock, subscriptionMock, configMock, keySharedMeta);
+ try {
+ PulsarApi.IntRange.newBuilder().setStart(0).setEnd(9).build();
+ keySharedMeta = PulsarApi.KeySharedMeta.newBuilder().setKeySharedMode(PulsarApi.KeySharedMode.STICKY)
+ .addHashRanges(PulsarApi.IntRange.newBuilder().setStart(0).setEnd(9).build()).build();
+ Consumer consumerMock = mock(Consumer.class);
+ doReturn(keySharedMeta).when(consumerMock).getKeySharedMeta();
+ persistentDispatcher.addConsumer(consumerMock);
+ persistentDispatcher.consumerFlow(consumerMock, 1000);
+ } catch (Exception e) {
+ fail("Failed to add mock consumer", e);
+ }
+
+ List<Entry> entries = new ArrayList<>();
+ entries.add(EntryImpl.create(1, 1, createMessage("message1", 1)));
+ entries.add(EntryImpl.create(1, 2, createMessage("message2", 2)));
+
+ try {
+ //Should success,see issue #8960
+ persistentDispatcher.readEntriesComplete(entries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+ } catch (Exception e) {
+ fail("Failed to readEntriesComplete.", e);
+ }
+ }
+
@Test
public void testSkipRedeliverTemporally() {
final Consumer slowConsumerMock = mock(Consumer.class);