You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/18 07:19:07 UTC
[pulsar] 04/04: Fix wrong key-hash selector used for new consumers
after all the previous consumers disconnected (#12035)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4cc082170cb7346acf808a4842a2634339c3853d
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Sep 17 14:35:02 2021 +0800
Fix wrong key-hash selector used for new consumers after all the previous consumers disconnected (#12035)
We will encounter the issue after all the previous consumers disconnected and the new consumers connect
to the topic with different key_shared policy.
The root cause is we are using the previous dispatcher after the key_shared policy changed, so the fix
is to use a new dispatcher after a new consumer with a different key-shared policy
(cherry picked from commit 3a4755f50ef46c3d94ce9629478941d5224cb800)
---
...istentStickyKeyDispatcherMultipleConsumers.java | 43 +++++++++++++
.../nonpersistent/NonPersistentSubscription.java | 35 +++--------
...istentStickyKeyDispatcherMultipleConsumers.java | 10 ++-
.../service/persistent/PersistentSubscription.java | 8 ++-
.../client/api/KeySharedSubscriptionTest.java | 71 ++++++++++++++++++++++
5 files changed, 136 insertions(+), 31 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 704fd93..878bac8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,28 +18,67 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
+import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.protocol.Commands;
public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers {
private final StickyKeyConsumerSelector selector;
+ private final KeySharedMode keySharedMode;
public NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription,
+ KeySharedMeta ksm) {
+ super(topic, subscription);
+ this.keySharedMode = ksm.getKeySharedMode();
+ switch (this.keySharedMode) {
+ case STICKY:
+ this.selector = new HashRangeExclusiveStickyKeyConsumerSelector();
+ break;
+
+ case AUTO_SPLIT:
+ default:
+ ServiceConfiguration conf = topic.getBrokerService().getPulsar().getConfiguration();
+ if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
+ this.selector = new ConsistentHashingStickyKeyConsumerSelector(
+ conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
+ } else {
+ this.selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
+ }
+ break;
+ }
+ }
+
+ @VisibleForTesting
+ NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription,
StickyKeyConsumerSelector selector) {
super(topic, subscription);
+ if (selector instanceof HashRangeExclusiveStickyKeyConsumerSelector) {
+ keySharedMode = KeySharedMode.STICKY;
+ } else if (selector instanceof ConsistentHashingStickyKeyConsumerSelector
+ || selector instanceof HashRangeAutoSplitStickyKeyConsumerSelector) {
+ keySharedMode = KeySharedMode.AUTO_SPLIT;
+ } else {
+ keySharedMode = null;
+ }
this.selector = selector;
}
@@ -121,4 +160,8 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
}
}
}
+
+ public KeySharedMode getKeySharedMode() {
+ return keySharedMode;
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index a400fae..a392e41 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -28,22 +28,19 @@ import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
-import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
-import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
-import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
-import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
@@ -143,29 +140,13 @@ public class NonPersistentSubscription implements Subscription {
}
break;
case Key_Shared:
- if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
+ KeySharedMeta ksm = consumer.getKeySharedMeta();
+ KeySharedMode keySharedMode = ksm.getKeySharedMode();
+ if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
+ || ((NonPersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode()
+ != keySharedMode) {
previousDispatcher = dispatcher;
-
- switch (consumer.getKeySharedMeta().getKeySharedMode()) {
- case STICKY:
- dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
- new HashRangeExclusiveStickyKeyConsumerSelector());
- break;
-
- case AUTO_SPLIT:
- default:
- StickyKeyConsumerSelector selector;
- ServiceConfiguration conf = topic.getBrokerService().getPulsar().getConfiguration();
- if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
- selector = new ConsistentHashingStickyKeyConsumerSelector(
- conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
- } else {
- selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
- }
-
- dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, selector);
- break;
- }
+ this.dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, ksm);
}
break;
default:
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index d4d64e2..e62f63e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +58,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
private final StickyKeyConsumerSelector selector;
private boolean isDispatcherStuckOnReplays = false;
+ private final KeySharedMode keySharedMode;
/**
* When a consumer joins, it will be added to this map with the current read position.
@@ -76,8 +78,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>();
this.stuckConsumers = new HashSet<>();
this.nextStuckConsumers = new HashSet<>();
-
- switch (ksm.getKeySharedMode()) {
+ this.keySharedMode = ksm.getKeySharedMode();
+ switch (this.keySharedMode) {
case AUTO_SPLIT:
if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
selector = new ConsistentHashingStickyKeyConsumerSelector(
@@ -408,6 +410,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
return cursor.asyncReplayEntries(positions, this, ReadType.Replay, true);
}
+ public KeySharedMode getKeySharedMode() {
+ return this.keySharedMode;
+ }
+
public LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
return recentlyJoinedConsumers;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index cf01ace..1eb99e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -66,6 +66,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.api.proto.TxnAction;
@@ -256,9 +257,12 @@ public class PersistentSubscription implements Subscription {
}
break;
case Key_Shared:
- if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
+ KeySharedMeta ksm = consumer.getKeySharedMeta();
+ KeySharedMode keySharedMode = ksm.getKeySharedMode();
+ if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
+ || ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode()
+ != keySharedMode) {
previousDispatcher = dispatcher;
- KeySharedMeta ksm = consumer.getKeySharedMeta();
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
topic.getBrokerService().getPulsar().getConfiguration(), ksm);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index e785ac1..bc19852 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -39,6 +40,7 @@ import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -47,8 +49,12 @@ import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.common.api.proto.KeySharedMode;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
@@ -93,6 +99,14 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
};
}
+ @DataProvider(name = "topicDomain")
+ public Object[][] topicDomainProvider() {
+ return new Object[][] {
+ { "persistent" },
+ { "non-persistent" }
+ };
+ }
+
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
@@ -1012,6 +1026,63 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
});
}
+ @Test(dataProvider = "topicDomain")
+ public void testSelectorChangedAfterAllConsumerDisconnected(String topicDomain) throws PulsarClientException,
+ ExecutionException, InterruptedException {
+ final String topicName = TopicName.get(topicDomain, "public", "default",
+ "testSelectorChangedAfterAllConsumerDisconnected" + UUID.randomUUID()).toString();
+
+ final String subName = "my-sub";
+
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+ .consumerName("first-consumer")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
+ .cryptoKeyReader(new EncKeyReader())
+ .subscribe();
+
+ CompletableFuture<Optional<Topic>> future = pulsar.getBrokerService().getTopicIfExists(topicName);
+ assertTrue(future.isDone());
+ assertTrue(future.get().isPresent());
+ Topic topic = future.get().get();
+ KeySharedMode keySharedMode = getKeySharedModeOfSubscription(topic, subName);
+ assertNotNull(keySharedMode);
+ assertEquals(keySharedMode, KeySharedMode.AUTO_SPLIT);
+
+ consumer1.close();
+
+ consumer1 = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+ .consumerName("second-consumer")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 65535)))
+ .cryptoKeyReader(new EncKeyReader())
+ .subscribe();
+
+ future = pulsar.getBrokerService().getTopicIfExists(topicName);
+ assertTrue(future.isDone());
+ assertTrue(future.get().isPresent());
+ topic = future.get().get();
+ keySharedMode = getKeySharedModeOfSubscription(topic, subName);
+ assertNotNull(keySharedMode);
+ assertEquals(keySharedMode, KeySharedMode.STICKY);
+ consumer1.close();
+ }
+
+ private KeySharedMode getKeySharedModeOfSubscription(Topic topic, String subscription) {
+ if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) {
+ return ((PersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subscription)
+ .getDispatcher()).getKeySharedMode();
+ } else if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.non_persistent)) {
+ return ((NonPersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subscription)
+ .getDispatcher()).getKeySharedMode();
+ }
+ return null;
+ }
+
private Consumer<String> createFixedHashRangesConsumer(String topic, String subscription, Range... ranges) throws PulsarClientException {
return pulsarClient.newConsumer(Schema.STRING)
.topic(topic)