You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/04/27 13:42:57 UTC
[pulsar] branch master updated: [enh][broker] Support properties on NonDurable subscriptions (PIP-105) (#15345)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 95b7040d584 [enh][broker] Support properties on NonDurable subscriptions (PIP-105) (#15345)
95b7040d584 is described below
commit 95b7040d58492b385fb9cf54c5611908424488e3
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Wed Apr 27 15:42:47 2022 +0200
[enh][broker] Support properties on NonDurable subscriptions (PIP-105) (#15345)
---
.../broker/service/persistent/PersistentTopic.java | 15 +++-
.../broker/admin/CreateSubscriptionTest.java | 98 ++++++++++++++++++----
2 files changed, 91 insertions(+), 22 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index bfe0b644109..7b19181fcd1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -779,7 +779,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
replicatedSubscriptionState, subscriptionProperties)
: getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
- startMessageRollbackDurationSec, readCompacted);
+ startMessageRollbackDurationSec, readCompacted, subscriptionProperties);
CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel,
@@ -925,8 +925,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
private CompletableFuture<? extends Subscription> getNonDurableSubscription(String subscriptionName,
MessageId startMessageId, InitialPosition initialPosition, long startMessageRollbackDurationSec,
- boolean isReadCompacted) {
- log.info("[{}][{}] Creating non-durable subscription at msg id {}", topic, subscriptionName, startMessageId);
+ boolean isReadCompacted, Map<String, String> subscriptionProperties) {
+ log.info("[{}][{}] Creating non-durable subscription at msg id {} - {}",
+ topic, subscriptionName, startMessageId, subscriptionProperties);
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
@@ -964,7 +965,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return FutureUtil.failedFuture(e);
}
- subscription = new PersistentSubscription(this, subscriptionName, cursor, false);
+ subscription = new PersistentSubscription(this, subscriptionName, cursor, false,
+ subscriptionProperties);
subscriptions.put(subscriptionName, subscription);
} else {
// if subscription exists, check if it's a durable subscription
@@ -972,6 +974,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return FutureUtil.failedFuture(
new NotAllowedException("Durable subscription with the same name already exists."));
}
+
+ if (MapUtils.isEmpty(subscription.getSubscriptionProperties())
+ && MapUtils.isNotEmpty(subscriptionProperties)) {
+ subscription.getSubscriptionProperties().putAll(subscriptionProperties);
+ }
}
if (startMessageRollbackDurationSec > 0) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
index 5b43419462a..e0f5e0a77a9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
@@ -58,6 +59,7 @@ import org.awaitility.Awaitility;
import org.eclipse.jetty.http.HttpStatus;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-admin")
@@ -156,15 +158,20 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
}
}
- @Test
- public void testSubscriptionPropertiesStats() throws Exception {
+ @DataProvider(name = "subscriptionMode")
+ public Object[][] subscriptionModeProvider() {
+ return new Object[][] { { SubscriptionMode.Durable }, { SubscriptionMode.NonDurable } };
+ }
+
+ @Test(dataProvider = "subscriptionMode")
+ public void testSubscriptionPropertiesStats(SubscriptionMode subscriptionMode) throws Exception {
// test non-partitioned topic
final String topic = "persistent://my-property/my-ns/topic" + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);
Map<String, String> map = new HashMap<>();
map.put("test-topic", "tag1");
String subName = "my-sub";
- pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+ pulsarClient.newConsumer().subscriptionMode(subscriptionMode).topic(topic).receiverQueueSize(1)
.subscriptionProperties(map).subscriptionName(subName).subscribe();
TopicStats stats = admin.topics().getStats(topic);
Map<String, String> subProperties = stats.getSubscriptions().get(subName).getSubscriptionProperties();
@@ -178,7 +185,7 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
pMap.put("topic2", "tag2");
pMap.put("topic3", "tag3");
String pSubName = "my-sub-1";
- pulsarClient.newConsumer().topic(partitionedTopic).receiverQueueSize(1)
+ pulsarClient.newConsumer().subscriptionMode(subscriptionMode).topic(partitionedTopic).receiverQueueSize(1)
.subscriptionProperties(pMap).subscriptionName(pSubName).subscribe();
PartitionedTopicStats pStats = admin.topics().getPartitionedStats(partitionedTopic, false);
@@ -192,16 +199,17 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
assertEquals(pSubPropForPerPartition, pMap);
}
- @Test
- public void addSubscriptionPropertiesTest() throws Exception {
+ @Test(dataProvider = "subscriptionMode")
+ public void addSubscriptionPropertiesTest(SubscriptionMode subscriptionMode) throws Exception {
String topic = "persistent://my-property/my-ns/topic" + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);
Map<String, String> map = new HashMap<>();
map.put("1", "1");
map.put("2", "2");
String subName = "my-sub";
- Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
- .subscriptionProperties(map).subscriptionName(subName).subscribe();
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .subscriptionMode(subscriptionMode)
+ .topic(topic).receiverQueueSize(1).subscriptionProperties(map).subscriptionName(subName).subscribe();
PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
Map<String, String> properties = subscription.getSubscriptionProperties();
@@ -234,7 +242,8 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
// restart broker, consumer use old properties
restartBroker();
- Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+ Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic)
+ .subscriptionMode(subscriptionMode).receiverQueueSize(1)
.subscriptionProperties(map).subscriptionName(subName).subscribe();
PersistentSubscription subscription2 = (PersistentSubscription) pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
@@ -251,7 +260,8 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
Map<String, String> map3 = new HashMap<>();
map3.put("3", "3");
map3.put("4", "4");
- Consumer<byte[]> consumer3 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+ Consumer<byte[]> consumer3 = pulsarClient.newConsumer().topic(topic).subscriptionMode(subscriptionMode)
+ .receiverQueueSize(1)
.subscriptionProperties(map3).subscriptionName(subName).subscribe();
Map<String, String> properties3 = subscription.getSubscriptionProperties();
assertTrue(properties3.containsKey("1"));
@@ -262,7 +272,8 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
//restart and create a new consumer with new properties, the new properties should be updated
restartBroker();
- Consumer<byte[]> consumer4 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+ Consumer<byte[]> consumer4 = pulsarClient.newConsumer().subscriptionMode(subscriptionMode)
+ .topic(topic).receiverQueueSize(1)
.subscriptionProperties(map3).subscriptionName(subName).subscribe();
PersistentSubscription subscription4 = (PersistentSubscription) pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
@@ -273,21 +284,29 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
assertEquals(properties4.get("4"), "4");
consumer4.close();
- //consumer subscribe without subscriptionProperties set, it will get the old one
- consumer4 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+
+ //consumer subscribe without subscriptionProperties set, it will get the old one for a Durable Subscription
+ // it will use the new (empty) set for a NonDurable Subscription
+ // so for Non-Durable subscriptions you have always to re-connect with the same properties
+ consumer4 = pulsarClient.newConsumer().topic(topic).subscriptionMode(subscriptionMode).receiverQueueSize(1)
.subscriptionName(subName).subscribe();
subscription4 = (PersistentSubscription) pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
properties4 = subscription4.getSubscriptionProperties();
- assertTrue(properties4.containsKey("3"));
- assertTrue(properties4.containsKey("4"));
- assertEquals(properties4.get("3"), "3");
- assertEquals(properties4.get("4"), "4");
+ if (subscriptionMode == SubscriptionMode.Durable) {
+ assertTrue(properties4.containsKey("3"));
+ assertTrue(properties4.containsKey("4"));
+ assertEquals(properties4.get("3"), "3");
+ assertEquals(properties4.get("4"), "4");
+ } else {
+ assertTrue(properties4.isEmpty());
+ }
consumer4.close();
//restart broker, it won't get any properties
restartBroker();
- consumer4 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+ consumer4 = pulsarClient.newConsumer().topic(topic).subscriptionMode(subscriptionMode)
+ .receiverQueueSize(1)
.subscriptionName(subName).subscribe();
subscription4 = (PersistentSubscription) pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
@@ -297,6 +316,7 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
//restart broker and create a new consumer with new properties, the properties will be updated
restartBroker();
consumer4 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+ .subscriptionMode(subscriptionMode)
.subscriptionProperties(map)
.subscriptionName(subName).subscribe();
PersistentSubscription subscription5 = (PersistentSubscription) pulsar.getBrokerService()
@@ -309,6 +329,48 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
consumer4.close();
+ String subNameShared = "my-sub-shared";
+ Map<String, String> mapShared = new HashMap<>();
+ // open two consumers with a Shared Subscription
+ Consumer consumerShared1 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+ .subscriptionMode(subscriptionMode)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionProperties(mapShared)
+ .subscriptionName(subNameShared).subscribe();
+ PersistentSubscription subscriptionShared = (PersistentSubscription) pulsar.getBrokerService()
+ .getTopicReference(topic).get().getSubscription(subNameShared);
+ properties = subscriptionShared.getSubscriptionProperties();
+ assertEquals(properties, mapShared);
+
+ // add a new consumer, the properties are updated because they were empty
+ mapShared = new HashMap<>();
+ mapShared.put("6", "7");
+ mapShared.put("8", "9");
+ Consumer consumerShared2 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+ .subscriptionMode(subscriptionMode)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionProperties(mapShared)
+ .subscriptionName(subNameShared).subscribe();
+
+ properties = subscriptionShared.getSubscriptionProperties();
+ assertEquals(properties, mapShared);
+
+ // add a third consumer, the properties are NOT updated because they are not empty
+ Map<String, String> mapShared2 = new HashMap<>();
+ mapShared2.put("10", "11");
+ Consumer consumerShared3 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+ .subscriptionMode(subscriptionMode)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionProperties(mapShared2)
+ .subscriptionName(subNameShared).subscribe();
+
+ properties = subscriptionShared.getSubscriptionProperties();
+ // verify that the properties are not updated
+ assertEquals(properties, mapShared);
+
+ consumerShared1.close();
+ consumerShared2.close();
+ consumerShared3.close();
}
@Test