You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/04/25 13:42:31 UTC
[pulsar] branch master updated: PIP-34 Add flag to enable or
disable Key_Shared subscription. (#4120)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 508fe22 PIP-34 Add flag to enable or disable Key_Shared subscription. (#4120)
508fe22 is described below
commit 508fe22c2e44c4d9aef5e8a7fa9f914b96ed16db
Author: lipenghui <co...@gmail.com>
AuthorDate: Thu Apr 25 21:42:26 2019 +0800
PIP-34 Add flag to enable or disable Key_Shared subscription. (#4120)
### Motivation
Add a broker level flag to enable or disable Key_Shared subscription, disabled by default.
### Modifications
Add a flag named `subscriptionRedeliveryTrackerEnabled`
Add documentation to describe Key_Shared subscription is a beta feature.
---
conf/broker.conf | 3 +++
.../org/apache/pulsar/broker/ServiceConfiguration.java | 7 +++++++
.../broker/service/persistent/PersistentTopic.java | 8 ++++++++
.../pulsar/client/api/KeySharedSubscriptionTest.java | 18 +++++++++++++++---
site2/docs/concepts-messaging.md | 2 ++
5 files changed, 35 insertions(+), 3 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 8a7da2b..856d173 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -107,6 +107,9 @@ subscriptionRedeliveryTrackerEnabled=true
# How frequently to proactively check and purge expired subscription
subscriptionExpiryCheckIntervalInMinutes=5
+# Enable Key_Shared subscription (default is enabled)
+subscriptionKeySharedEnable=true
+
# Set the default behavior for message deduplication in the broker
# This can be overridden per-namespace. If enabled, broker will reject
# messages that were already stored in the topic
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 694b824..b600ac7 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -268,6 +268,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
@FieldContext(
category = CATEGORY_POLICIES,
+ dynamic = true,
+ doc = "Enable Key_Shared subscription (default is enabled)"
+ )
+ private boolean subscriptionKeySharedEnable = true;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
doc = "Set the default behavior for message deduplication in the broker.\n\n"
+ "This can be overridden per-namespace. If enabled, broker will reject"
+ " messages that were already stored in the topic"
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e1a0509..19bc174 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -498,6 +498,14 @@ public class PersistentTopic implements Topic, AddEntryCallback {
new NotAllowedException("readCompacted only allowed on failover or exclusive subscriptions"));
return future;
}
+
+ if (subType == SubType.Key_Shared
+ && !brokerService.pulsar().getConfiguration().isSubscriptionKeySharedEnable()) {
+ future.completeExceptionally(
+ new NotAllowedException("Key_Shared subscription is disabled by broker.")
+ );
+ return future;
+ }
if (isBlank(subscriptionName)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Empty subscription name", topic);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index e5f1edc..14c9693 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -35,7 +35,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(KeySharedSubscriptionTest.class);
-
@BeforeMethod
@Override
protected void setup() throws Exception {
@@ -51,7 +50,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@Test
public void testSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
-
+ this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared";
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
@@ -153,7 +152,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@Test
public void testNonKeySendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
-
+ this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared_none_key";
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
@@ -251,6 +250,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@Test
public void testOrderingKeyWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
+ this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared_ordering_key";
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
@@ -327,4 +327,16 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
Assert.assertEquals(consumer2ExpectMessages, consumer2Received);
Assert.assertEquals(consumer3ExpectMessages, consumer3Received);
}
+
+ @Test(expectedExceptions = PulsarClientException.class)
+ public void testDisableKeySharedSubscription() throws PulsarClientException {
+ this.conf.setSubscriptionKeySharedEnable(false);
+ String topic = "persistent://public/default/key_shared_disabled";
+ pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("key_shared")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .ackTimeout(10, TimeUnit.SECONDS)
+ .subscribe();
+ }
}
diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md
index 9714596..5bda2e9 100644
--- a/site2/docs/concepts-messaging.md
+++ b/site2/docs/concepts-messaging.md
@@ -150,6 +150,8 @@ In *Key_Shared* mode, multiple consumers can attach to the same subscription. Me
![Key_Shared subscriptions](assets/pulsar-key-shared-subscriptions.png)
+**Key_Shared subscription is a beta feature. You can disable it at broker.config.**
+
## Multi-topic subscriptions
When a consumer subscribes to a Pulsar topic, by default it subscribes to one specific topic, such as `persistent://public/default/my-topic`. As of Pulsar version 1.23.0-incubating, however, Pulsar consumers can simultaneously subscribe to multiple topics. You can define a list of topics in two ways: