You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/04/25 13:42:31 UTC

[pulsar] branch master updated: PIP-34 Add flag to enable or disable Key_Shared subscription. (#4120)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 508fe22  PIP-34 Add flag to enable or disable Key_Shared subscription. (#4120)
508fe22 is described below

commit 508fe22c2e44c4d9aef5e8a7fa9f914b96ed16db
Author: lipenghui <co...@gmail.com>
AuthorDate: Thu Apr 25 21:42:26 2019 +0800

    PIP-34 Add flag to enable or disable Key_Shared subscription. (#4120)
    
    ### Motivation
    
    Add a broker level flag to enable or disable Key_Shared subscription, disabled by default.
    
    ### Modifications
    
    Add a flag named `subscriptionRedeliveryTrackerEnabled`
    Add documentation to describe Key_Shared subscription is a beta feature.
---
 conf/broker.conf                                       |  3 +++
 .../org/apache/pulsar/broker/ServiceConfiguration.java |  7 +++++++
 .../broker/service/persistent/PersistentTopic.java     |  8 ++++++++
 .../pulsar/client/api/KeySharedSubscriptionTest.java   | 18 +++++++++++++++---
 site2/docs/concepts-messaging.md                       |  2 ++
 5 files changed, 35 insertions(+), 3 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 8a7da2b..856d173 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -107,6 +107,9 @@ subscriptionRedeliveryTrackerEnabled=true
 # How frequently to proactively check and purge expired subscription
 subscriptionExpiryCheckIntervalInMinutes=5
 
+# Enable Key_Shared subscription (default is enabled)
+subscriptionKeySharedEnable=true
+
 # Set the default behavior for message deduplication in the broker
 # This can be overridden per-namespace. If enabled, broker will reject
 # messages that were already stored in the topic
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 694b824..b600ac7 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -268,6 +268,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
 
     @FieldContext(
         category = CATEGORY_POLICIES,
+        dynamic = true,
+        doc = "Enable Key_Shared subscription (default is enabled)"
+    )
+    private boolean subscriptionKeySharedEnable = true;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
         doc = "Set the default behavior for message deduplication in the broker.\n\n"
             + "This can be overridden per-namespace. If enabled, broker will reject"
             + " messages that were already stored in the topic"
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e1a0509..19bc174 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -498,6 +498,14 @@ public class PersistentTopic implements Topic, AddEntryCallback {
                     new NotAllowedException("readCompacted only allowed on failover or exclusive subscriptions"));
             return future;
         }
+
+        if (subType == SubType.Key_Shared
+            && !brokerService.pulsar().getConfiguration().isSubscriptionKeySharedEnable()) {
+            future.completeExceptionally(
+                new NotAllowedException("Key_Shared subscription is disabled by broker.")
+            );
+            return future;
+        }
         if (isBlank(subscriptionName)) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Empty subscription name", topic);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index e5f1edc..14c9693 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -35,7 +35,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
 
     private static final Logger log = LoggerFactory.getLogger(KeySharedSubscriptionTest.class);
 
-
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
@@ -51,7 +50,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
 
     @Test
     public void testSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
-
+        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "persistent://public/default/key_shared";
 
         Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
@@ -153,7 +152,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
 
     @Test
     public void testNonKeySendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
-
+        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "persistent://public/default/key_shared_none_key";
 
         Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
@@ -251,6 +250,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
 
     @Test
     public void testOrderingKeyWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
+        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "persistent://public/default/key_shared_ordering_key";
 
         Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
@@ -327,4 +327,16 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         Assert.assertEquals(consumer2ExpectMessages, consumer2Received);
         Assert.assertEquals(consumer3ExpectMessages, consumer3Received);
     }
+
+    @Test(expectedExceptions = PulsarClientException.class)
+    public void testDisableKeySharedSubscription() throws PulsarClientException {
+        this.conf.setSubscriptionKeySharedEnable(false);
+        String topic = "persistent://public/default/key_shared_disabled";
+        pulsarClient.newConsumer()
+            .topic(topic)
+            .subscriptionName("key_shared")
+            .subscriptionType(SubscriptionType.Key_Shared)
+            .ackTimeout(10, TimeUnit.SECONDS)
+            .subscribe();
+    }
 }
diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md
index 9714596..5bda2e9 100644
--- a/site2/docs/concepts-messaging.md
+++ b/site2/docs/concepts-messaging.md
@@ -150,6 +150,8 @@ In *Key_Shared* mode, multiple consumers can attach to the same subscription. Me
 
 ![Key_Shared subscriptions](assets/pulsar-key-shared-subscriptions.png)
 
+**Key_Shared subscription is a beta feature. You can disable it at broker.config.**
+
 ## Multi-topic subscriptions
 
 When a consumer subscribes to a Pulsar topic, by default it subscribes to one specific topic, such as `persistent://public/default/my-topic`. As of Pulsar version 1.23.0-incubating, however, Pulsar consumers can simultaneously subscribe to multiple topics. You can define a list of topics in two ways: