You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/18 07:19:07 UTC

[pulsar] 04/04: Fix wrong key-hash selector used for new consumers after all the previous consumers disconnected (#12035)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4cc082170cb7346acf808a4842a2634339c3853d
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Sep 17 14:35:02 2021 +0800

    Fix wrong key-hash selector used for new consumers after all the previous consumers disconnected (#12035)
    
    We will encounter the issue after all the previous consumers disconnected and the new consumers connect
    to the topic with different key_shared policy.
    
    The root cause is we are using the previous dispatcher after the key_shared policy changed, so the fix
    is to use a new dispatcher after a new consumer with a different key-shared policy
    
    (cherry picked from commit 3a4755f50ef46c3d94ce9629478941d5224cb800)
---
 ...istentStickyKeyDispatcherMultipleConsumers.java | 43 +++++++++++++
 .../nonpersistent/NonPersistentSubscription.java   | 35 +++--------
 ...istentStickyKeyDispatcherMultipleConsumers.java | 10 ++-
 .../service/persistent/PersistentSubscription.java |  8 ++-
 .../client/api/KeySharedSubscriptionTest.java      | 71 ++++++++++++++++++++++
 5 files changed, 136 insertions(+), 31 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 704fd93..878bac8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,28 +18,67 @@
  */
 package org.apache.pulsar.broker.service.nonpersistent;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.FastThreadLocal;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
+import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.api.proto.KeySharedMode;
 import org.apache.pulsar.common.protocol.Commands;
 
 public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers {
 
     private final StickyKeyConsumerSelector selector;
+    private final KeySharedMode keySharedMode;
 
     public NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription,
+                                                             KeySharedMeta ksm) {
+        super(topic, subscription);
+        this.keySharedMode = ksm.getKeySharedMode();
+        switch (this.keySharedMode) {
+            case STICKY:
+                this.selector = new HashRangeExclusiveStickyKeyConsumerSelector();
+                break;
+
+            case AUTO_SPLIT:
+            default:
+                ServiceConfiguration conf = topic.getBrokerService().getPulsar().getConfiguration();
+                if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
+                    this.selector = new ConsistentHashingStickyKeyConsumerSelector(
+                            conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
+                } else {
+                    this.selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
+                }
+                break;
+        }
+    }
+
+    @VisibleForTesting
+    NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription,
                                                              StickyKeyConsumerSelector selector) {
         super(topic, subscription);
+        if (selector instanceof HashRangeExclusiveStickyKeyConsumerSelector) {
+            keySharedMode = KeySharedMode.STICKY;
+        } else if (selector instanceof ConsistentHashingStickyKeyConsumerSelector
+                || selector instanceof HashRangeAutoSplitStickyKeyConsumerSelector) {
+            keySharedMode = KeySharedMode.AUTO_SPLIT;
+        } else {
+            keySharedMode = null;
+        }
         this.selector = selector;
     }
 
@@ -121,4 +160,8 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
             }
         }
     }
+
+    public KeySharedMode getKeySharedMode() {
+        return keySharedMode;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index a400fae..a392e41 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -28,22 +28,19 @@ import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
-import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
-import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
-import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
-import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.api.proto.KeySharedMode;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
@@ -143,29 +140,13 @@ public class NonPersistentSubscription implements Subscription {
                 }
                 break;
             case Key_Shared:
-                if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
+                KeySharedMeta ksm = consumer.getKeySharedMeta();
+                KeySharedMode keySharedMode = ksm.getKeySharedMode();
+                if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
+                        || ((NonPersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode()
+                        != keySharedMode) {
                     previousDispatcher = dispatcher;
-
-                    switch (consumer.getKeySharedMeta().getKeySharedMode()) {
-                        case STICKY:
-                            dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
-                                    new HashRangeExclusiveStickyKeyConsumerSelector());
-                            break;
-
-                        case AUTO_SPLIT:
-                        default:
-                            StickyKeyConsumerSelector selector;
-                            ServiceConfiguration conf = topic.getBrokerService().getPulsar().getConfiguration();
-                            if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
-                                selector = new ConsistentHashingStickyKeyConsumerSelector(
-                                        conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
-                            } else {
-                                selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
-                            }
-
-                            dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, selector);
-                            break;
-                    }
+                    this.dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, ksm);
                 }
                 break;
             default:
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index d4d64e2..e62f63e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.api.proto.KeySharedMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +58,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
     private final StickyKeyConsumerSelector selector;
 
     private boolean isDispatcherStuckOnReplays = false;
+    private final KeySharedMode keySharedMode;
 
     /**
      * When a consumer joins, it will be added to this map with the current read position.
@@ -76,8 +78,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>();
         this.stuckConsumers = new HashSet<>();
         this.nextStuckConsumers = new HashSet<>();
-
-        switch (ksm.getKeySharedMode()) {
+        this.keySharedMode = ksm.getKeySharedMode();
+        switch (this.keySharedMode) {
         case AUTO_SPLIT:
             if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
                 selector = new ConsistentHashingStickyKeyConsumerSelector(
@@ -408,6 +410,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         return cursor.asyncReplayEntries(positions, this, ReadType.Replay, true);
     }
 
+    public KeySharedMode getKeySharedMode() {
+        return this.keySharedMode;
+    }
+
     public LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
         return recentlyJoinedConsumers;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index cf01ace..1eb99e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -66,6 +66,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.api.proto.KeySharedMode;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
 import org.apache.pulsar.common.api.proto.TxnAction;
@@ -256,9 +257,12 @@ public class PersistentSubscription implements Subscription {
                             }
                             break;
                         case Key_Shared:
-                            if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
+                            KeySharedMeta ksm = consumer.getKeySharedMeta();
+                            KeySharedMode keySharedMode = ksm.getKeySharedMode();
+                            if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
+                                    || ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode()
+                                    != keySharedMode) {
                                 previousDispatcher = dispatcher;
-                                KeySharedMeta ksm = consumer.getKeySharedMeta();
                                 dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
                                         topic.getBrokerService().getPulsar().getConfiguration(), ksm);
                             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index e785ac1..bc19852 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.api;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -39,6 +40,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -47,8 +49,12 @@ import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.common.api.proto.KeySharedMode;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.awaitility.Awaitility;
@@ -93,6 +99,14 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         };
     }
 
+    @DataProvider(name = "topicDomain")
+    public Object[][] topicDomainProvider() {
+        return new Object[][] {
+                { "persistent" },
+                { "non-persistent" }
+        };
+    }
+
     @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
@@ -1012,6 +1026,63 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         });
     }
 
+    @Test(dataProvider = "topicDomain")
+    public void testSelectorChangedAfterAllConsumerDisconnected(String topicDomain) throws PulsarClientException,
+            ExecutionException, InterruptedException {
+        final String topicName = TopicName.get(topicDomain, "public", "default",
+                "testSelectorChangedAfterAllConsumerDisconnected" + UUID.randomUUID()).toString();
+
+        final String subName = "my-sub";
+
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .consumerName("first-consumer")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
+                .cryptoKeyReader(new EncKeyReader())
+                .subscribe();
+
+        CompletableFuture<Optional<Topic>> future = pulsar.getBrokerService().getTopicIfExists(topicName);
+        assertTrue(future.isDone());
+        assertTrue(future.get().isPresent());
+        Topic topic = future.get().get();
+        KeySharedMode keySharedMode = getKeySharedModeOfSubscription(topic, subName);
+        assertNotNull(keySharedMode);
+        assertEquals(keySharedMode, KeySharedMode.AUTO_SPLIT);
+
+        consumer1.close();
+
+        consumer1 = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .consumerName("second-consumer")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 65535)))
+                .cryptoKeyReader(new EncKeyReader())
+                .subscribe();
+
+        future = pulsar.getBrokerService().getTopicIfExists(topicName);
+        assertTrue(future.isDone());
+        assertTrue(future.get().isPresent());
+        topic = future.get().get();
+        keySharedMode = getKeySharedModeOfSubscription(topic, subName);
+        assertNotNull(keySharedMode);
+        assertEquals(keySharedMode, KeySharedMode.STICKY);
+        consumer1.close();
+    }
+
+    private KeySharedMode getKeySharedModeOfSubscription(Topic topic, String subscription) {
+        if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) {
+            return ((PersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subscription)
+                    .getDispatcher()).getKeySharedMode();
+        } else if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.non_persistent)) {
+            return ((NonPersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subscription)
+                    .getDispatcher()).getKeySharedMode();
+        }
+        return null;
+    }
+
     private Consumer<String> createFixedHashRangesConsumer(String topic, String subscription, Range... ranges) throws PulsarClientException {
         return pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)