You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/11 14:27:22 UTC
[pulsar] 06/10: Fix wrong key-hash selector used for new consumers after all the previous consumers disconnected (#12035)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 141f2508f9543fcd6bfa368fe9a2cfe8e71ab93f
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Sep 17 14:35:02 2021 +0800
Fix wrong key-hash selector used for new consumers after all the previous consumers disconnected (#12035)
We will encounter the issue after all the previous consumers disconnected and the new consumers connect
to the topic with different key_shared policy.
The root cause is we are using the previous dispatcher after the key_shared policy changed, so the fix
is to use a new dispatcher after a new consumer with a different key-shared policy
(cherry picked from commit 3a4755f50ef46c3d94ce9629478941d5224cb800)
---
...istentStickyKeyDispatcherMultipleConsumers.java | 42 +++++++++++++
.../nonpersistent/NonPersistentSubscription.java | 36 +++--------
...istentStickyKeyDispatcherMultipleConsumers.java | 10 ++-
.../service/persistent/PersistentSubscription.java | 10 ++-
.../broker/service/PersistentTopicE2ETest.java | 6 +-
.../client/api/KeySharedSubscriptionTest.java | 71 +++++++++++++++++++++-
6 files changed, 138 insertions(+), 37 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index c1c0227..68ff404 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
@@ -26,21 +27,58 @@ import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
+import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers {
private final StickyKeyConsumerSelector selector;
+ private final PulsarApi.KeySharedMode keySharedMode;
public NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription,
+ PulsarApi.KeySharedMeta ksm) {
+ super(topic, subscription);
+ this.keySharedMode = ksm.getKeySharedMode();
+ switch (this.keySharedMode) {
+ case STICKY:
+ this.selector = new HashRangeExclusiveStickyKeyConsumerSelector();
+ break;
+
+ case AUTO_SPLIT:
+ default:
+ ServiceConfiguration conf = topic.getBrokerService().getPulsar().getConfiguration();
+ if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
+ this.selector = new ConsistentHashingStickyKeyConsumerSelector(
+ conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
+ } else {
+ this.selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
+ }
+ break;
+ }
+ }
+
+ @VisibleForTesting
+ NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription,
StickyKeyConsumerSelector selector) {
super(topic, subscription);
+ if (selector instanceof HashRangeExclusiveStickyKeyConsumerSelector) {
+ keySharedMode = PulsarApi.KeySharedMode.STICKY;
+ } else if (selector instanceof ConsistentHashingStickyKeyConsumerSelector
+ || selector instanceof HashRangeAutoSplitStickyKeyConsumerSelector) {
+ keySharedMode = PulsarApi.KeySharedMode.AUTO_SPLIT;
+ } else {
+ keySharedMode = null;
+ }
this.selector = selector;
}
@@ -105,4 +143,8 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
}
}
+
+ public PulsarApi.KeySharedMode getKeySharedMode() {
+ return keySharedMode;
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 7aa26fa..7b0600b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -28,7 +28,6 @@ import com.google.common.base.MoreObjects;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -36,12 +35,9 @@ import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyE
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
-import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
-import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
-import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
-import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
@@ -135,30 +131,14 @@ public class NonPersistentSubscription implements Subscription {
}
break;
case Key_Shared:
- if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
+ KeySharedMeta ksm = consumer.getKeySharedMeta() != null ? consumer.getKeySharedMeta()
+ : KeySharedMeta.getDefaultInstance();
+ PulsarApi.KeySharedMode keySharedMode = ksm.getKeySharedMode();
+ if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
+ || ((NonPersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode()
+ != keySharedMode) {
previousDispatcher = dispatcher;
- KeySharedMeta ksm = consumer.getKeySharedMeta() != null ? consumer.getKeySharedMeta() : KeySharedMeta.getDefaultInstance();
-
- switch (ksm.getKeySharedMode()) {
- case STICKY:
- dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
- new HashRangeExclusiveStickyKeyConsumerSelector());
- break;
-
- case AUTO_SPLIT:
- default:
- StickyKeyConsumerSelector selector;
- ServiceConfiguration conf = topic.getBrokerService().getPulsar().getConfiguration();
- if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
- selector = new ConsistentHashingStickyKeyConsumerSelector(
- conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
- } else {
- selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
- }
-
- dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, selector);
- break;
- }
+ this.dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, ksm);
}
break;
default:
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 596c9c7..51bec23 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelec
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta;
import org.slf4j.Logger;
@@ -59,6 +60,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
private final StickyKeyConsumerSelector selector;
private boolean isDispatcherStuckOnReplays = false;
+ private final PulsarApi.KeySharedMode keySharedMode;
/**
* When a consumer joins, it will be added to this map with the current read position.
@@ -78,8 +80,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>();
this.stuckConsumers = new HashSet<>();
this.nextStuckConsumers = new HashSet<>();
-
- switch (ksm.getKeySharedMode()) {
+ this.keySharedMode = ksm.getKeySharedMode();
+ switch (this.keySharedMode) {
case AUTO_SPLIT:
if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
selector = new ConsistentHashingStickyKeyConsumerSelector(
@@ -401,6 +403,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
return cursor.asyncReplayEntries(positions, this, ReadType.Replay, true);
}
+ public PulsarApi.KeySharedMode getKeySharedMode() {
+ return this.keySharedMode;
+ }
+
public LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
return recentlyJoinedConsumers;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index c39d4bf..5350baf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta;
@@ -212,10 +213,13 @@ public class PersistentSubscription implements Subscription {
}
break;
case Key_Shared:
- if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
+ KeySharedMeta ksm = consumer.getKeySharedMeta() != null ? consumer.getKeySharedMeta()
+ : KeySharedMeta.getDefaultInstance();
+ PulsarApi.KeySharedMode keySharedMode = ksm.getKeySharedMode();
+ if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
+ || ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode()
+ != keySharedMode) {
previousDispatcher = dispatcher;
- KeySharedMeta ksm = consumer.getKeySharedMeta() != null ? consumer.getKeySharedMeta()
- : KeySharedMeta.getDefaultInstance();
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
topic.getBrokerService().getPulsar().getConfiguration(), ksm);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index f9c7e18..ba35185 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -1701,7 +1701,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
.producerName("xxx")
.create();
- assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
+ assertEquals(admin.topics().getStats(topicName).publishers.size(), 1);
for (int i =0; i < 5; i++) {
try {
@@ -1714,7 +1714,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
// Expected
}
- assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
+ assertEquals(admin.topics().getStats(topicName).publishers.size(), 1);
}
// Try from different connection
@@ -1733,6 +1733,6 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
// Expected
}
- assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
+ assertEquals(admin.topics().getStats(topicName).publishers.size(), 1);
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index f8dad41..7a6f04a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -36,6 +37,7 @@ import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -44,9 +46,13 @@ import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
@@ -90,7 +96,15 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
};
}
- @BeforeMethod
+ @DataProvider(name = "topicDomain")
+ public Object[][] topicDomainProvider() {
+ return new Object[][] {
+ { "persistent" },
+ { "non-persistent" }
+ };
+ }
+
+ @BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
@@ -937,6 +951,61 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
client.close();
}
+ @Test(dataProvider = "topicDomain")
+ public void testSelectorChangedAfterAllConsumerDisconnected(String topicDomain) throws PulsarClientException,
+ ExecutionException, InterruptedException {
+ final String topicName = TopicName.get(topicDomain, "public", "default",
+ "testSelectorChangedAfterAllConsumerDisconnected" + UUID.randomUUID()).toString();
+
+ final String subName = "my-sub";
+
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+ .consumerName("first-consumer")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
+ .subscribe();
+
+ CompletableFuture<Optional<Topic>> future = pulsar.getBrokerService().getTopicIfExists(topicName);
+ assertTrue(future.isDone());
+ assertTrue(future.get().isPresent());
+ Topic topic = future.get().get();
+ PulsarApi.KeySharedMode keySharedMode = getKeySharedModeOfSubscription(topic, subName);
+ assertNotNull(keySharedMode);
+ assertEquals(keySharedMode, PulsarApi.KeySharedMode.AUTO_SPLIT);
+
+ consumer1.close();
+
+ consumer1 = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+ .consumerName("second-consumer")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 65535)))
+ .subscribe();
+
+ future = pulsar.getBrokerService().getTopicIfExists(topicName);
+ assertTrue(future.isDone());
+ assertTrue(future.get().isPresent());
+ topic = future.get().get();
+ keySharedMode = getKeySharedModeOfSubscription(topic, subName);
+ assertNotNull(keySharedMode);
+ assertEquals(keySharedMode, PulsarApi.KeySharedMode.STICKY);
+ consumer1.close();
+ }
+
+ private PulsarApi.KeySharedMode getKeySharedModeOfSubscription(Topic topic, String subscription) {
+ if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) {
+ return ((PersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subscription)
+ .getDispatcher()).getKeySharedMode();
+ } else if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.non_persistent)) {
+ return ((NonPersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subscription)
+ .getDispatcher()).getKeySharedMode();
+ }
+ return null;
+ }
+
private Consumer<String> createFixedHashRangesConsumer(String topic, String subscription, Range... ranges) throws PulsarClientException {
return pulsarClient.newConsumer(Schema.STRING)
.topic(topic)