You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2024/04/19 02:54:01 UTC
(pulsar) branch branch-3.0 updated: [fix][broker] Fix NPE causing dispatching to stop when using Key_Shared mode and allowOutOfOrderDelivery=true (#22533)
This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 53d784815c3 [fix][broker] Fix NPE causing dispatching to stop when using Key_Shared mode and allowOutOfOrderDelivery=true (#22533)
53d784815c3 is described below
commit 53d784815c3027f3fce456240a28694ee2730f4b
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Fri Apr 19 09:13:19 2024 +0800
[fix][broker] Fix NPE causing dispatching to stop when using Key_Shared mode and allowOutOfOrderDelivery=true (#22533)
(cherry picked from commit 2badcf6bd0be1aad2a5ec6da552185b4ef5b745b)
---
...PersistentStickyKeyDispatcherMultipleConsumers.java | 10 ++++++++++
.../pulsar/client/api/KeySharedSubscriptionTest.java | 18 ++++++++++++++++--
2 files changed, 26 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index ee2ebd7ca86..2df9f38531f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -457,6 +457,11 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
@Override
protected synchronized NavigableSet<PositionImpl> filterOutEntriesWillBeDiscarded(NavigableSet<PositionImpl> src) {
+ // The variable "hashesToBeBlocked" and "recentlyJoinedConsumers" will be null if "isAllowOutOfOrderDelivery()",
+ // So skip this filter out.
+ if (isAllowOutOfOrderDelivery()) {
+ return src;
+ }
if (src.isEmpty()) {
return src;
}
@@ -501,6 +506,11 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
*/
@Override
protected boolean hasConsumersNeededNormalRead() {
+ // The variable "hashesToBeBlocked" and "recentlyJoinedConsumers" will be null if "isAllowOutOfOrderDelivery()",
+ // So the method "filterOutEntriesWillBeDiscarded" will filter out nothing, just return "true" here.
+ if (isAllowOutOfOrderDelivery()) {
+ return true;
+ }
for (Consumer consumer : consumerList) {
if (consumer == null || consumer.isBlocked()) {
continue;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 72195550508..27aa98597ec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -1741,6 +1741,14 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
admin.topics().delete(topic, false);
}
+ @DataProvider(name = "allowKeySharedOutOfOrder")
+ public Object[][] allowKeySharedOutOfOrder() {
+ return new Object[][]{
+ {true},
+ {false}
+ };
+ }
+
/**
* This test is in order to guarantee the feature added by https://github.com/apache/pulsar/pull/7105.
* 1. Start 3 consumers:
@@ -1755,8 +1763,8 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
* - no repeated Read-and-discard.
* - at last, all messages will be received.
*/
- @Test(timeOut = 180 * 1000) // the test will be finished in 60s.
- public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
+ @Test(timeOut = 180 * 1000, dataProvider = "allowKeySharedOutOfOrder") // the test will be finished in 60s.
+ public void testRecentJoinedPosWillNotStuckOtherConsumer(boolean allowKeySharedOutOfOrder) throws Exception {
final int messagesSentPerTime = 100;
final Set<Integer> totalReceivedMessages = new TreeSet<>();
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
@@ -1775,6 +1783,8 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
log.info("Published message :{}", messageId);
}
+ KeySharedPolicy keySharedPolicy = KeySharedPolicy.autoSplitHashRange()
+ .setAllowOutOfOrderDelivery(allowKeySharedOutOfOrder);
// 1. Start 3 consumers and make ack holes.
// - one consumer will be closed and trigger a messages redeliver.
// - one consumer will not ack any messages to make the new consumer joined late will be stuck due to the
@@ -1785,18 +1795,21 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
+ .keySharedPolicy(keySharedPolicy)
.subscribe();
Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
+ .keySharedPolicy(keySharedPolicy)
.subscribe();
Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
+ .keySharedPolicy(keySharedPolicy)
.subscribe();
List<Message> msgList1 = new ArrayList<>();
List<Message> msgList2 = new ArrayList<>();
@@ -1845,6 +1858,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
.subscriptionName(subName)
.receiverQueueSize(1000)
.subscriptionType(SubscriptionType.Key_Shared)
+ .keySharedPolicy(keySharedPolicy)
.subscribe();
consumerWillBeClose.close();