You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2024/04/19 02:54:01 UTC

(pulsar) branch branch-3.0 updated: [fix][broker] Fix NPE causing dispatching to stop when using Key_Shared mode and allowOutOfOrderDelivery=true (#22533)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 53d784815c3 [fix][broker] Fix NPE causing dispatching to stop when using Key_Shared mode and allowOutOfOrderDelivery=true (#22533)
53d784815c3 is described below

commit 53d784815c3027f3fce456240a28694ee2730f4b
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Fri Apr 19 09:13:19 2024 +0800

    [fix][broker] Fix NPE causing dispatching to stop when using Key_Shared mode and allowOutOfOrderDelivery=true (#22533)
    
    (cherry picked from commit 2badcf6bd0be1aad2a5ec6da552185b4ef5b745b)
---
 ...PersistentStickyKeyDispatcherMultipleConsumers.java | 10 ++++++++++
 .../pulsar/client/api/KeySharedSubscriptionTest.java   | 18 ++++++++++++++++--
 2 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index ee2ebd7ca86..2df9f38531f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -457,6 +457,11 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
     @Override
     protected synchronized NavigableSet<PositionImpl> filterOutEntriesWillBeDiscarded(NavigableSet<PositionImpl> src) {
+        // The variable "hashesToBeBlocked" and "recentlyJoinedConsumers" will be null if "isAllowOutOfOrderDelivery()",
+        // So skip this filter out.
+        if (isAllowOutOfOrderDelivery()) {
+            return src;
+        }
         if (src.isEmpty()) {
             return src;
         }
@@ -501,6 +506,11 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
      */
     @Override
     protected boolean hasConsumersNeededNormalRead() {
+        // The variable "hashesToBeBlocked" and "recentlyJoinedConsumers" will be null if "isAllowOutOfOrderDelivery()",
+        // So the method "filterOutEntriesWillBeDiscarded" will filter out nothing, just return "true" here.
+        if (isAllowOutOfOrderDelivery()) {
+            return true;
+        }
         for (Consumer consumer : consumerList) {
             if (consumer == null || consumer.isBlocked()) {
                 continue;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 72195550508..27aa98597ec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -1741,6 +1741,14 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         admin.topics().delete(topic, false);
     }
 
+    @DataProvider(name = "allowKeySharedOutOfOrder")
+    public Object[][] allowKeySharedOutOfOrder() {
+        return new Object[][]{
+                {true},
+                {false}
+        };
+    }
+
     /**
      * This test is in order to guarantee the feature added by https://github.com/apache/pulsar/pull/7105.
      * 1. Start 3 consumers:
@@ -1755,8 +1763,8 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
      *   - no repeated Read-and-discard.
      *   - at last, all messages will be received.
      */
-    @Test(timeOut = 180 * 1000) // the test will be finished in 60s.
-    public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
+    @Test(timeOut = 180 * 1000, dataProvider = "allowKeySharedOutOfOrder") // the test will be finished in 60s.
+    public void testRecentJoinedPosWillNotStuckOtherConsumer(boolean allowKeySharedOutOfOrder) throws Exception {
         final int messagesSentPerTime = 100;
         final Set<Integer> totalReceivedMessages = new TreeSet<>();
         final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
@@ -1775,6 +1783,8 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
             log.info("Published message :{}", messageId);
         }
 
+        KeySharedPolicy keySharedPolicy = KeySharedPolicy.autoSplitHashRange()
+                .setAllowOutOfOrderDelivery(allowKeySharedOutOfOrder);
         // 1. Start 3 consumers and make ack holes.
         //   - one consumer will be closed and trigger a messages redeliver.
         //   - one consumer will not ack any messages to make the new consumer joined late will be stuck due to the
@@ -1785,18 +1795,21 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
                 .subscriptionName(subName)
                 .receiverQueueSize(10)
                 .subscriptionType(SubscriptionType.Key_Shared)
+                .keySharedPolicy(keySharedPolicy)
                 .subscribe();
         Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
                 .topic(topic)
                 .subscriptionName(subName)
                 .receiverQueueSize(10)
                 .subscriptionType(SubscriptionType.Key_Shared)
+                .keySharedPolicy(keySharedPolicy)
                 .subscribe();
         Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
                 .topic(topic)
                 .subscriptionName(subName)
                 .receiverQueueSize(10)
                 .subscriptionType(SubscriptionType.Key_Shared)
+                .keySharedPolicy(keySharedPolicy)
                 .subscribe();
         List<Message> msgList1 = new ArrayList<>();
         List<Message> msgList2 = new ArrayList<>();
@@ -1845,6 +1858,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
                 .subscriptionName(subName)
                 .receiverQueueSize(1000)
                 .subscriptionType(SubscriptionType.Key_Shared)
+                .keySharedPolicy(keySharedPolicy)
                 .subscribe();
         consumerWillBeClose.close();