You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/04/27 13:44:18 UTC

[pulsar] branch branch-2.10 updated: [enh][broker] Support properties on NonDurable subscriptions (PIP-105) (#15345)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new b125c619feb [enh][broker] Support properties on NonDurable subscriptions (PIP-105) (#15345)
b125c619feb is described below

commit b125c619febfe462f117bbfc6cd76fd45f5e5a0e
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Wed Apr 27 15:42:47 2022 +0200

    [enh][broker] Support properties on NonDurable subscriptions (PIP-105) (#15345)
    
    (cherry picked from commit 95b7040d58492b385fb9cf54c5611908424488e3)
---
 .../broker/service/persistent/PersistentTopic.java | 15 +++-
 .../broker/admin/CreateSubscriptionTest.java       | 98 ++++++++++++++++++----
 2 files changed, 91 insertions(+), 22 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e30049a098d..3914278ae7f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -754,7 +754,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                     getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
                             replicatedSubscriptionState, subscriptionProperties)
                     : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
-                    startMessageRollbackDurationSec, readCompacted);
+                    startMessageRollbackDurationSec, readCompacted, subscriptionProperties);
 
             CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> {
                 Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel,
@@ -900,8 +900,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
     private CompletableFuture<? extends Subscription> getNonDurableSubscription(String subscriptionName,
             MessageId startMessageId, InitialPosition initialPosition, long startMessageRollbackDurationSec,
-            boolean isReadCompacted) {
-        log.info("[{}][{}] Creating non-durable subscription at msg id {}", topic, subscriptionName, startMessageId);
+            boolean isReadCompacted, Map<String, String> subscriptionProperties) {
+        log.info("[{}][{}] Creating non-durable subscription at msg id {} - {}",
+                topic, subscriptionName, startMessageId, subscriptionProperties);
 
         CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
         if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
@@ -939,7 +940,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                     return FutureUtil.failedFuture(e);
                 }
 
-                subscription = new PersistentSubscription(this, subscriptionName, cursor, false);
+                subscription = new PersistentSubscription(this, subscriptionName, cursor, false,
+                        subscriptionProperties);
                 subscriptions.put(subscriptionName, subscription);
             } else {
                 // if subscription exists, check if it's a durable subscription
@@ -947,6 +949,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                     return FutureUtil.failedFuture(
                             new NotAllowedException("Durable subscription with the same name already exists."));
                 }
+
+                if (MapUtils.isEmpty(subscription.getSubscriptionProperties())
+                        && MapUtils.isNotEmpty(subscriptionProperties)) {
+                    subscription.getSubscriptionProperties().putAll(subscriptionProperties);
+                }
             }
 
             if (startMessageRollbackDurationSec > 0) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
index 5b43419462a..e0f5e0a77a9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.TopicName;
@@ -58,6 +59,7 @@ import org.awaitility.Awaitility;
 import org.eclipse.jetty.http.HttpStatus;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-admin")
@@ -156,15 +158,20 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
         }
     }
 
-    @Test
-    public void testSubscriptionPropertiesStats() throws Exception {
+    @DataProvider(name = "subscriptionMode")
+    public Object[][] subscriptionModeProvider() {
+        return new Object[][] { { SubscriptionMode.Durable }, { SubscriptionMode.NonDurable } };
+    }
+
+    @Test(dataProvider = "subscriptionMode")
+    public void testSubscriptionPropertiesStats(SubscriptionMode subscriptionMode) throws Exception {
         // test non-partitioned topic
         final String topic = "persistent://my-property/my-ns/topic" + UUID.randomUUID();
         admin.topics().createNonPartitionedTopic(topic);
         Map<String, String> map = new HashMap<>();
         map.put("test-topic", "tag1");
         String subName = "my-sub";
-        pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+        pulsarClient.newConsumer().subscriptionMode(subscriptionMode).topic(topic).receiverQueueSize(1)
                 .subscriptionProperties(map).subscriptionName(subName).subscribe();
         TopicStats stats = admin.topics().getStats(topic);
         Map<String, String> subProperties = stats.getSubscriptions().get(subName).getSubscriptionProperties();
@@ -178,7 +185,7 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
         pMap.put("topic2", "tag2");
         pMap.put("topic3", "tag3");
         String pSubName = "my-sub-1";
-        pulsarClient.newConsumer().topic(partitionedTopic).receiverQueueSize(1)
+        pulsarClient.newConsumer().subscriptionMode(subscriptionMode).topic(partitionedTopic).receiverQueueSize(1)
                 .subscriptionProperties(pMap).subscriptionName(pSubName).subscribe();
 
         PartitionedTopicStats pStats = admin.topics().getPartitionedStats(partitionedTopic, false);
@@ -192,16 +199,17 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
         assertEquals(pSubPropForPerPartition, pMap);
     }
 
-    @Test
-    public void addSubscriptionPropertiesTest() throws Exception {
+    @Test(dataProvider = "subscriptionMode")
+    public void addSubscriptionPropertiesTest(SubscriptionMode subscriptionMode) throws Exception {
         String topic = "persistent://my-property/my-ns/topic" + UUID.randomUUID();
         admin.topics().createNonPartitionedTopic(topic);
         Map<String, String> map = new HashMap<>();
         map.put("1", "1");
         map.put("2", "2");
         String subName = "my-sub";
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
-                .subscriptionProperties(map).subscriptionName(subName).subscribe();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .subscriptionMode(subscriptionMode)
+                .topic(topic).receiverQueueSize(1).subscriptionProperties(map).subscriptionName(subName).subscribe();
         PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
                 .getTopicReference(topic).get().getSubscription(subName);
         Map<String, String> properties = subscription.getSubscriptionProperties();
@@ -234,7 +242,8 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
         // restart broker, consumer use old properties
         restartBroker();
 
-        Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic)
+                .subscriptionMode(subscriptionMode).receiverQueueSize(1)
                 .subscriptionProperties(map).subscriptionName(subName).subscribe();
         PersistentSubscription subscription2 = (PersistentSubscription) pulsar.getBrokerService()
                 .getTopicReference(topic).get().getSubscription(subName);
@@ -251,7 +260,8 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
         Map<String, String> map3 = new HashMap<>();
         map3.put("3", "3");
         map3.put("4", "4");
-        Consumer<byte[]> consumer3 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+        Consumer<byte[]> consumer3 = pulsarClient.newConsumer().topic(topic).subscriptionMode(subscriptionMode)
+                .receiverQueueSize(1)
                 .subscriptionProperties(map3).subscriptionName(subName).subscribe();
         Map<String, String> properties3 = subscription.getSubscriptionProperties();
         assertTrue(properties3.containsKey("1"));
@@ -262,7 +272,8 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
 
         //restart and create a new consumer with new properties, the new properties should be updated
         restartBroker();
-        Consumer<byte[]> consumer4 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+        Consumer<byte[]> consumer4 = pulsarClient.newConsumer().subscriptionMode(subscriptionMode)
+                .topic(topic).receiverQueueSize(1)
                 .subscriptionProperties(map3).subscriptionName(subName).subscribe();
         PersistentSubscription subscription4 = (PersistentSubscription) pulsar.getBrokerService()
                 .getTopicReference(topic).get().getSubscription(subName);
@@ -273,21 +284,29 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
         assertEquals(properties4.get("4"), "4");
         consumer4.close();
 
-        //consumer subscribe without subscriptionProperties set, it will get the old one
-        consumer4 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+
+        //consumer subscribe without subscriptionProperties set, it will get the old one for a Durable Subscription
+        // it will use the new (empty) set for a NonDurable Subscription
+        // so for Non-Durable subscriptions you have always to re-connect with the same properties
+        consumer4 = pulsarClient.newConsumer().topic(topic).subscriptionMode(subscriptionMode).receiverQueueSize(1)
                 .subscriptionName(subName).subscribe();
         subscription4 = (PersistentSubscription) pulsar.getBrokerService()
                 .getTopicReference(topic).get().getSubscription(subName);
         properties4 = subscription4.getSubscriptionProperties();
-        assertTrue(properties4.containsKey("3"));
-        assertTrue(properties4.containsKey("4"));
-        assertEquals(properties4.get("3"), "3");
-        assertEquals(properties4.get("4"), "4");
+        if (subscriptionMode == SubscriptionMode.Durable) {
+            assertTrue(properties4.containsKey("3"));
+            assertTrue(properties4.containsKey("4"));
+            assertEquals(properties4.get("3"), "3");
+            assertEquals(properties4.get("4"), "4");
+        } else {
+            assertTrue(properties4.isEmpty());
+        }
         consumer4.close();
 
         //restart broker, it won't get any properties
         restartBroker();
-        consumer4 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+        consumer4 = pulsarClient.newConsumer().topic(topic).subscriptionMode(subscriptionMode)
+                .receiverQueueSize(1)
                 .subscriptionName(subName).subscribe();
         subscription4 = (PersistentSubscription) pulsar.getBrokerService()
                 .getTopicReference(topic).get().getSubscription(subName);
@@ -297,6 +316,7 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
         //restart broker and create a new consumer with new properties, the properties will be updated
         restartBroker();
         consumer4 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+                .subscriptionMode(subscriptionMode)
                 .subscriptionProperties(map)
                 .subscriptionName(subName).subscribe();
         PersistentSubscription subscription5 = (PersistentSubscription) pulsar.getBrokerService()
@@ -309,6 +329,48 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
         consumer4.close();
 
 
+        String subNameShared = "my-sub-shared";
+        Map<String, String> mapShared = new HashMap<>();
+        // open two consumers with a Shared Subscription
+        Consumer consumerShared1 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+                .subscriptionMode(subscriptionMode)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionProperties(mapShared)
+                .subscriptionName(subNameShared).subscribe();
+        PersistentSubscription subscriptionShared = (PersistentSubscription) pulsar.getBrokerService()
+                .getTopicReference(topic).get().getSubscription(subNameShared);
+        properties = subscriptionShared.getSubscriptionProperties();
+        assertEquals(properties, mapShared);
+
+        // add a new consumer, the properties are updated because they were empty
+        mapShared = new HashMap<>();
+        mapShared.put("6", "7");
+        mapShared.put("8", "9");
+        Consumer consumerShared2 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+                .subscriptionMode(subscriptionMode)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionProperties(mapShared)
+                .subscriptionName(subNameShared).subscribe();
+
+        properties = subscriptionShared.getSubscriptionProperties();
+        assertEquals(properties, mapShared);
+
+        // add a third consumer, the properties are NOT updated because they are not empty
+        Map<String, String> mapShared2 = new HashMap<>();
+        mapShared2.put("10", "11");
+        Consumer consumerShared3 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+                .subscriptionMode(subscriptionMode)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionProperties(mapShared2)
+                .subscriptionName(subNameShared).subscribe();
+
+        properties = subscriptionShared.getSubscriptionProperties();
+        // verify that the properties are not updated
+        assertEquals(properties, mapShared);
+
+        consumerShared1.close();
+        consumerShared2.close();
+        consumerShared3.close();
     }
 
     @Test