You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/12/01 16:16:11 UTC

[pulsar] branch master updated: Key_Shared dispatcher with no connected consumers should be recreated if allowOutOfOrderDelivery changes (#13063)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 36c1c00  Key_Shared dispatcher with no connected consumers should be recreated if allowOutOfOrderDelivery changes (#13063)
36c1c00 is described below

commit 36c1c002547df647ec1584bad4783d6529481db0
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Thu Dec 2 01:15:04 2021 +0900

    Key_Shared dispatcher with no connected consumers should be recreated if allowOutOfOrderDelivery changes (#13063)
---
 ...istentStickyKeyDispatcherMultipleConsumers.java |  4 +++
 .../nonpersistent/NonPersistentSubscription.java   |  5 ++-
 ...istentStickyKeyDispatcherMultipleConsumers.java | 13 +++++---
 .../service/persistent/PersistentSubscription.java |  6 ++--
 .../client/api/KeySharedSubscriptionTest.java      | 38 ++++++++++++++++++++++
 5 files changed, 55 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 878bac8..0dac0a2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -164,4 +164,8 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
     public KeySharedMode getKeySharedMode() {
         return keySharedMode;
     }
+
+    public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) {
+        return (ksm.getKeySharedMode() == this.keySharedMode);
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 59fab2d..056fd0f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -143,10 +143,9 @@ public class NonPersistentSubscription implements Subscription {
                 break;
             case Key_Shared:
                 KeySharedMeta ksm = consumer.getKeySharedMeta();
-                keySharedMode = ksm.getKeySharedMode();
                 if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
-                        || ((NonPersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode()
-                        != keySharedMode) {
+                        || !((NonPersistentStickyKeyDispatcherMultipleConsumers) dispatcher)
+                                .hasSameKeySharedPolicy(ksm)) {
                     previousDispatcher = dispatcher;
                     this.dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, ksm);
                 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 5c8f33e..116a9eb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -448,6 +448,15 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         return this.keySharedMode;
     }
 
+    public boolean isAllowOutOfOrderDelivery() {
+        return this.allowOutOfOrderDelivery;
+    }
+
+    public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) {
+        return (ksm.getKeySharedMode() == this.keySharedMode
+                && ksm.isAllowOutOfOrderDelivery() == this.allowOutOfOrderDelivery);
+    }
+
     public LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
         return recentlyJoinedConsumers;
     }
@@ -456,10 +465,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         return selector.getConsumerKeyHashRanges();
     }
 
-    public boolean isAllowOutOfOrderDelivery() {
-        return allowOutOfOrderDelivery;
-    }
-
     private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
 
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 061d038..1d878e5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -70,7 +70,6 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
-import org.apache.pulsar.common.api.proto.KeySharedMode;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
 import org.apache.pulsar.common.api.proto.TxnAction;
@@ -270,10 +269,9 @@ public class PersistentSubscription implements Subscription {
                             break;
                         case Key_Shared:
                             KeySharedMeta ksm = consumer.getKeySharedMeta();
-                            KeySharedMode keySharedMode = ksm.getKeySharedMode();
                             if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
-                                    || ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode()
-                                    != keySharedMode) {
+                                    || !((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher)
+                                            .hasSameKeySharedPolicy(ksm)) {
                                 previousDispatcher = dispatcher;
                                 dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
                                         topic.getBrokerService().getPulsar().getConfiguration(), ksm);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 1349e6c..a78e64d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.api;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -1068,6 +1069,43 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         consumer1.close();
     }
 
+    @Test
+    public void testAllowOutOfOrderDeliveryChangedAfterAllConsumerDisconnected() throws Exception {
+        final String topicName = "persistent://public/default/change-allow-ooo-delivery-" + UUID.randomUUID();
+        final String subName = "my-sub";
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(true))
+                .subscribe();
+
+        CompletableFuture<Optional<Topic>> future = pulsar.getBrokerService().getTopicIfExists(topicName);
+        assertTrue(future.isDone());
+        assertTrue(future.get().isPresent());
+        Topic topic = future.get().get();
+        PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
+                (PersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subName).getDispatcher();
+        assertTrue(dispatcher.isAllowOutOfOrderDelivery());
+        consumer.close();
+
+        consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(false))
+                .subscribe();
+
+        future = pulsar.getBrokerService().getTopicIfExists(topicName);
+        assertTrue(future.isDone());
+        assertTrue(future.get().isPresent());
+        topic = future.get().get();
+        dispatcher = (PersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subName).getDispatcher();
+        assertFalse(dispatcher.isAllowOutOfOrderDelivery());
+        consumer.close();
+    }
+
     private KeySharedMode getKeySharedModeOfSubscription(Topic topic, String subscription) {
         if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) {
             return ((PersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subscription)