You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/11 14:27:22 UTC

[pulsar] 06/10: Fix wrong key-hash selector used for new consumers after all the previous consumers disconnected (#12035)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 141f2508f9543fcd6bfa368fe9a2cfe8e71ab93f
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Sep 17 14:35:02 2021 +0800

    Fix wrong key-hash selector used for new consumers after all the previous consumers disconnected (#12035)
    
    We will encounter the issue after all the previous consumers disconnected and the new consumers connect
    to the topic with different key_shared policy.
    
    The root cause is we are using the previous dispatcher after the key_shared policy changed, so the fix
    is to use a new dispatcher after a new consumer with a different key-shared policy
    
    (cherry picked from commit 3a4755f50ef46c3d94ce9629478941d5224cb800)
---
 ...istentStickyKeyDispatcherMultipleConsumers.java | 42 +++++++++++++
 .../nonpersistent/NonPersistentSubscription.java   | 36 +++--------
 ...istentStickyKeyDispatcherMultipleConsumers.java | 10 ++-
 .../service/persistent/PersistentSubscription.java | 10 ++-
 .../broker/service/PersistentTopicE2ETest.java     |  6 +-
 .../client/api/KeySharedSubscriptionTest.java      | 71 +++++++++++++++++++++-
 6 files changed, 138 insertions(+), 37 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index c1c0227..68ff404 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.nonpersistent;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.FastThreadLocal;
 
 import java.util.ArrayList;
@@ -26,21 +27,58 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
+import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 
 public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers {
 
     private final StickyKeyConsumerSelector selector;
+    private final PulsarApi.KeySharedMode keySharedMode;
 
     public NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription,
+                                                             PulsarApi.KeySharedMeta ksm) {
+        super(topic, subscription);
+        this.keySharedMode = ksm.getKeySharedMode();
+        switch (this.keySharedMode) {
+            case STICKY:
+                this.selector = new HashRangeExclusiveStickyKeyConsumerSelector();
+                break;
+
+            case AUTO_SPLIT:
+            default:
+                ServiceConfiguration conf = topic.getBrokerService().getPulsar().getConfiguration();
+                if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
+                    this.selector = new ConsistentHashingStickyKeyConsumerSelector(
+                            conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
+                } else {
+                    this.selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
+                }
+                break;
+        }
+    }
+
+    @VisibleForTesting
+    NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription,
                                                              StickyKeyConsumerSelector selector) {
         super(topic, subscription);
+        if (selector instanceof HashRangeExclusiveStickyKeyConsumerSelector) {
+            keySharedMode = PulsarApi.KeySharedMode.STICKY;
+        } else if (selector instanceof ConsistentHashingStickyKeyConsumerSelector
+                || selector instanceof HashRangeAutoSplitStickyKeyConsumerSelector) {
+            keySharedMode = PulsarApi.KeySharedMode.AUTO_SPLIT;
+        } else {
+            keySharedMode = null;
+        }
         this.selector = selector;
     }
 
@@ -105,4 +143,8 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
             TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
         }
     }
+
+    public PulsarApi.KeySharedMode getKeySharedMode() {
+        return keySharedMode;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 7aa26fa..7b0600b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -28,7 +28,6 @@ import com.google.common.base.MoreObjects;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -36,12 +35,9 @@ import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyE
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
-import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
-import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
-import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
-import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
@@ -135,30 +131,14 @@ public class NonPersistentSubscription implements Subscription {
                 }
                 break;
             case Key_Shared:
-                if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
+                KeySharedMeta ksm = consumer.getKeySharedMeta() != null ? consumer.getKeySharedMeta()
+                        : KeySharedMeta.getDefaultInstance();
+                PulsarApi.KeySharedMode keySharedMode = ksm.getKeySharedMode();
+                if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
+                        || ((NonPersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode()
+                        != keySharedMode) {
                     previousDispatcher = dispatcher;
-                    KeySharedMeta ksm = consumer.getKeySharedMeta() != null ? consumer.getKeySharedMeta() : KeySharedMeta.getDefaultInstance();
-
-                    switch (ksm.getKeySharedMode()) {
-                        case STICKY:
-                            dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
-                                    new HashRangeExclusiveStickyKeyConsumerSelector());
-                            break;
-
-                        case AUTO_SPLIT:
-                        default:
-                            StickyKeyConsumerSelector selector;
-                            ServiceConfiguration conf = topic.getBrokerService().getPulsar().getConfiguration();
-                            if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
-                                selector = new ConsistentHashingStickyKeyConsumerSelector(
-                                        conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
-                            } else {
-                                selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
-                            }
-
-                            dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, selector);
-                            break;
-                    }
+                    this.dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, ksm);
                 }
                 break;
             default:
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 596c9c7..51bec23 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelec
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta;
 import org.slf4j.Logger;
@@ -59,6 +60,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
     private final StickyKeyConsumerSelector selector;
 
     private boolean isDispatcherStuckOnReplays = false;
+    private final PulsarApi.KeySharedMode keySharedMode;
 
     /**
      * When a consumer joins, it will be added to this map with the current read position.
@@ -78,8 +80,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>();
         this.stuckConsumers = new HashSet<>();
         this.nextStuckConsumers = new HashSet<>();
-
-        switch (ksm.getKeySharedMode()) {
+        this.keySharedMode = ksm.getKeySharedMode();
+        switch (this.keySharedMode) {
         case AUTO_SPLIT:
             if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
                 selector = new ConsistentHashingStickyKeyConsumerSelector(
@@ -401,6 +403,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         return cursor.asyncReplayEntries(positions, this, ReadType.Replay, true);
     }
 
+    public PulsarApi.KeySharedMode getKeySharedMode() {
+        return this.keySharedMode;
+    }
+
     public LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
         return recentlyJoinedConsumers;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index c39d4bf..5350baf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta;
@@ -212,10 +213,13 @@ public class PersistentSubscription implements Subscription {
                 }
                 break;
             case Key_Shared:
-                if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
+                KeySharedMeta ksm = consumer.getKeySharedMeta() != null ? consumer.getKeySharedMeta()
+                        : KeySharedMeta.getDefaultInstance();
+                PulsarApi.KeySharedMode keySharedMode = ksm.getKeySharedMode();
+                if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
+                        || ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode()
+                        != keySharedMode) {
                     previousDispatcher = dispatcher;
-                    KeySharedMeta ksm = consumer.getKeySharedMeta() != null ? consumer.getKeySharedMeta()
-                            : KeySharedMeta.getDefaultInstance();
                     dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
                             topic.getBrokerService().getPulsar().getConfiguration(), ksm);
                 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index f9c7e18..ba35185 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -1701,7 +1701,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
                 .producerName("xxx")
                 .create();
 
-        assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
+        assertEquals(admin.topics().getStats(topicName).publishers.size(), 1);
 
         for (int i =0; i < 5; i++) {
             try {
@@ -1714,7 +1714,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
                 // Expected
             }
 
-            assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
+            assertEquals(admin.topics().getStats(topicName).publishers.size(), 1);
         }
 
         // Try from different connection
@@ -1733,6 +1733,6 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
             // Expected
         }
 
-        assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
+        assertEquals(admin.topics().getStats(topicName).publishers.size(), 1);
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index f8dad41..7a6f04a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.api;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -36,6 +37,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -44,9 +46,13 @@ import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.awaitility.Awaitility;
@@ -90,7 +96,15 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         };
     }
 
-    @BeforeMethod
+    @DataProvider(name = "topicDomain")
+    public Object[][] topicDomainProvider() {
+        return new Object[][] {
+                { "persistent" },
+                { "non-persistent" }
+        };
+    }
+
+    @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
         super.internalSetup();
@@ -937,6 +951,61 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         client.close();
     }
 
+    @Test(dataProvider = "topicDomain")
+    public void testSelectorChangedAfterAllConsumerDisconnected(String topicDomain) throws PulsarClientException,
+            ExecutionException, InterruptedException {
+        final String topicName = TopicName.get(topicDomain, "public", "default",
+                "testSelectorChangedAfterAllConsumerDisconnected" + UUID.randomUUID()).toString();
+
+        final String subName = "my-sub";
+
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .consumerName("first-consumer")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
+                .subscribe();
+
+        CompletableFuture<Optional<Topic>> future = pulsar.getBrokerService().getTopicIfExists(topicName);
+        assertTrue(future.isDone());
+        assertTrue(future.get().isPresent());
+        Topic topic = future.get().get();
+        PulsarApi.KeySharedMode keySharedMode = getKeySharedModeOfSubscription(topic, subName);
+        assertNotNull(keySharedMode);
+        assertEquals(keySharedMode, PulsarApi.KeySharedMode.AUTO_SPLIT);
+
+        consumer1.close();
+
+        consumer1 = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .consumerName("second-consumer")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 65535)))
+                .subscribe();
+
+        future = pulsar.getBrokerService().getTopicIfExists(topicName);
+        assertTrue(future.isDone());
+        assertTrue(future.get().isPresent());
+        topic = future.get().get();
+        keySharedMode = getKeySharedModeOfSubscription(topic, subName);
+        assertNotNull(keySharedMode);
+        assertEquals(keySharedMode, PulsarApi.KeySharedMode.STICKY);
+        consumer1.close();
+    }
+
+    private PulsarApi.KeySharedMode getKeySharedModeOfSubscription(Topic topic, String subscription) {
+        if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) {
+            return ((PersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subscription)
+                    .getDispatcher()).getKeySharedMode();
+        } else if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.non_persistent)) {
+            return ((NonPersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subscription)
+                    .getDispatcher()).getKeySharedMode();
+        }
+        return null;
+    }
+
     private Consumer<String> createFixedHashRangesConsumer(String topic, String subscription, Range... ranges) throws PulsarClientException {
         return pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)