You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/12/01 16:16:11 UTC
[pulsar] branch master updated: Key_Shared dispatcher with no connected consumers should be recreated if allowOutOfOrderDelivery changes (#13063)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 36c1c00 Key_Shared dispatcher with no connected consumers should be recreated if allowOutOfOrderDelivery changes (#13063)
36c1c00 is described below
commit 36c1c002547df647ec1584bad4783d6529481db0
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Thu Dec 2 01:15:04 2021 +0900
Key_Shared dispatcher with no connected consumers should be recreated if allowOutOfOrderDelivery changes (#13063)
---
...istentStickyKeyDispatcherMultipleConsumers.java | 4 +++
.../nonpersistent/NonPersistentSubscription.java | 5 ++-
...istentStickyKeyDispatcherMultipleConsumers.java | 13 +++++---
.../service/persistent/PersistentSubscription.java | 6 ++--
.../client/api/KeySharedSubscriptionTest.java | 38 ++++++++++++++++++++++
5 files changed, 55 insertions(+), 11 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 878bac8..0dac0a2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -164,4 +164,8 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
public KeySharedMode getKeySharedMode() {
return keySharedMode;
}
+
+ public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) {
+ return (ksm.getKeySharedMode() == this.keySharedMode);
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 59fab2d..056fd0f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -143,10 +143,9 @@ public class NonPersistentSubscription implements Subscription {
break;
case Key_Shared:
KeySharedMeta ksm = consumer.getKeySharedMeta();
- keySharedMode = ksm.getKeySharedMode();
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
- || ((NonPersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode()
- != keySharedMode) {
+ || !((NonPersistentStickyKeyDispatcherMultipleConsumers) dispatcher)
+ .hasSameKeySharedPolicy(ksm)) {
previousDispatcher = dispatcher;
this.dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, ksm);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 5c8f33e..116a9eb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -448,6 +448,15 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
return this.keySharedMode;
}
+ public boolean isAllowOutOfOrderDelivery() {
+ return this.allowOutOfOrderDelivery;
+ }
+
+ public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) {
+ return (ksm.getKeySharedMode() == this.keySharedMode
+ && ksm.isAllowOutOfOrderDelivery() == this.allowOutOfOrderDelivery);
+ }
+
public LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
return recentlyJoinedConsumers;
}
@@ -456,10 +465,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
return selector.getConsumerKeyHashRanges();
}
- public boolean isAllowOutOfOrderDelivery() {
- return allowOutOfOrderDelivery;
- }
-
private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 061d038..1d878e5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -70,7 +70,6 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
-import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.api.proto.TxnAction;
@@ -270,10 +269,9 @@ public class PersistentSubscription implements Subscription {
break;
case Key_Shared:
KeySharedMeta ksm = consumer.getKeySharedMeta();
- KeySharedMode keySharedMode = ksm.getKeySharedMode();
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
- || ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode()
- != keySharedMode) {
+ || !((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher)
+ .hasSameKeySharedPolicy(ksm)) {
previousDispatcher = dispatcher;
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
topic.getBrokerService().getPulsar().getConfiguration(), ksm);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 1349e6c..a78e64d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -1068,6 +1069,43 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
consumer1.close();
}
+ @Test
+ public void testAllowOutOfOrderDeliveryChangedAfterAllConsumerDisconnected() throws Exception {
+ final String topicName = "persistent://public/default/change-allow-ooo-delivery-" + UUID.randomUUID();
+ final String subName = "my-sub";
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(true))
+ .subscribe();
+
+ CompletableFuture<Optional<Topic>> future = pulsar.getBrokerService().getTopicIfExists(topicName);
+ assertTrue(future.isDone());
+ assertTrue(future.get().isPresent());
+ Topic topic = future.get().get();
+ PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
+ (PersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subName).getDispatcher();
+ assertTrue(dispatcher.isAllowOutOfOrderDelivery());
+ consumer.close();
+
+ consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(false))
+ .subscribe();
+
+ future = pulsar.getBrokerService().getTopicIfExists(topicName);
+ assertTrue(future.isDone());
+ assertTrue(future.get().isPresent());
+ topic = future.get().get();
+ dispatcher = (PersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subName).getDispatcher();
+ assertFalse(dispatcher.isAllowOutOfOrderDelivery());
+ consumer.close();
+ }
+
private KeySharedMode getKeySharedModeOfSubscription(Topic topic, String subscription) {
if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) {
return ((PersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subscription)