You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/05/23 15:56:37 UTC

[pulsar] branch branch-2.10 updated: [enh] Issue 15455: Pulsar Admin: create subscripion with Properties (PIP-105) (#15503) (#15681)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new c7f9ffdf229 [enh] Issue 15455: Pulsar Admin: create subscripion with Properties (PIP-105) (#15503) (#15681)
c7f9ffdf229 is described below

commit c7f9ffdf22933f74f255e71fb512ee5c48fa4f45
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Mon May 23 17:56:32 2022 +0200

    [enh] Issue 15455: Pulsar Admin: create subscripion with Properties (PIP-105) (#15503) (#15681)
    
    (cherry picked from commit e2fa1898f7e9a38b1743b2abf21c87eafe126ee2)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 19 +++--
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  3 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  5 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  2 +-
 .../apache/pulsar/broker/service/Subscription.java |  2 +
 .../org/apache/pulsar/broker/service/Topic.java    |  2 +-
 .../nonpersistent/NonPersistentSubscription.java   | 10 ++-
 .../service/nonpersistent/NonPersistentTopic.java  | 14 ++--
 .../service/persistent/PersistentSubscription.java |  1 +
 .../broker/service/persistent/PersistentTopic.java |  7 +-
 .../ReplicatedSubscriptionsController.java         |  2 +-
 .../broker/admin/AdminApiSubscriptionTest.java     | 69 +++++++++++++++
 .../pulsar/broker/transaction/TransactionTest.java |  2 +-
 .../org/apache/pulsar/client/admin/Topics.java     | 97 +++++++++++++++++++++-
 .../pulsar/client/admin/internal/TopicsImpl.java   | 24 +++---
 .../pulsar/admin/cli/PulsarAdminToolTest.java      | 25 +++++-
 .../java/org/apache/pulsar/admin/cli/CmdBase.java  | 20 +++++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 16 +---
 .../pulsar/admin/cli/CmdPersistentTopics.java      |  9 +-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 43 ++--------
 .../apache/pulsar/client/impl/ResetCursorData.java | 10 ++-
 .../pulsar/tests/integration/cli/CLITest.java      | 25 ++++++
 22 files changed, 313 insertions(+), 94 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b286a6000dc..4228ac6254d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2114,7 +2114,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalCreateSubscription(AsyncResponse asyncResponse, String subscriptionName,
-            MessageIdImpl messageId, boolean authoritative, boolean replicated) {
+            MessageIdImpl messageId, boolean authoritative, boolean replicated, Map<String, String> properties) {
         CompletableFuture<Void> ret;
         if (topicName.isGlobal()) {
             ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
@@ -2123,12 +2123,12 @@ public class PersistentTopicsBase extends AdminResource {
         }
         ret.thenAccept(__ -> {
             final MessageIdImpl targetMessageId = messageId == null ? (MessageIdImpl) MessageId.latest : messageId;
-            log.info("[{}][{}] Creating subscription {} at message id {}", clientAppId(), topicName, subscriptionName,
-                    targetMessageId);
+            log.info("[{}][{}] Creating subscription {} at message id {} with properties {}", clientAppId(),
+                    topicName, subscriptionName, targetMessageId, properties);
             // If the topic name is a partition name, no need to get partition topic metadata again
             if (topicName.isPartitioned()) {
                 internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
-                        subscriptionName, targetMessageId, authoritative, replicated);
+                        subscriptionName, targetMessageId, authoritative, replicated, properties);
             } else {
                 boolean allowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);
                 getPartitionedTopicMetadataAsync(topicName,
@@ -2146,7 +2146,7 @@ public class PersistentTopicsBase extends AdminResource {
                             try {
                                 pulsar().getAdminClient().topics()
                                         .createSubscriptionAsync(topicNamePartition.toString(),
-                                                subscriptionName, targetMessageId)
+                                                subscriptionName, targetMessageId, false, properties)
                                         .handle((r, ex) -> {
                                             if (ex != null) {
                                                 // fail the operation on unknown exception or
@@ -2200,7 +2200,7 @@ public class PersistentTopicsBase extends AdminResource {
                         });
                     } else {
                         internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
-                                subscriptionName, targetMessageId, authoritative, replicated);
+                                subscriptionName, targetMessageId, authoritative, replicated, properties);
                     }
                 }).exceptionally(ex -> {
                     // If the exception is not redirect exception we need to log it.
@@ -2225,7 +2225,8 @@ public class PersistentTopicsBase extends AdminResource {
 
     private void internalCreateSubscriptionForNonPartitionedTopic(
             AsyncResponse asyncResponse, String subscriptionName,
-            MessageIdImpl targetMessageId, boolean authoritative, boolean replicated) {
+            MessageIdImpl targetMessageId, boolean authoritative, boolean replicated,
+            Map<String, String> properties) {
 
         boolean isAllowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);
 
@@ -2245,7 +2246,7 @@ public class PersistentTopicsBase extends AdminResource {
                 throw new RestException(Status.CONFLICT, "Subscription already exists for topic");
             }
 
-            return topic.createSubscription(subscriptionName, InitialPosition.Latest, replicated);
+            return topic.createSubscription(subscriptionName, InitialPosition.Latest, replicated, properties);
         }).thenCompose(subscription -> {
             // Mark the cursor as "inactive" as it was created without a real consumer connected
             ((PersistentSubscription) subscription).deactivateCursor();
@@ -4004,7 +4005,7 @@ public class PersistentTopicsBase extends AdminResource {
             Subscription sub = topic.getSubscription(subName);
             if (sub == null) {
                 sub = topic.createSubscription(subName,
-                        InitialPosition.Earliest, false).get();
+                        InitialPosition.Earliest, false, null).get();
             }
 
             return checkNotNull(sub);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index fcb71566f6b..283ba0ebccd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -652,7 +652,8 @@ public class PersistentTopics extends PersistentTopicsBase {
                 throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic "
                         + "can only be done through client");
             }
-            internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated);
+            internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated,
+                    null);
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 94a2ae7536c..fd5cb75b10b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1410,10 +1410,13 @@ public class PersistentTopics extends PersistentTopicsBase {
                 throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic "
                         + "can only be done through client");
             }
+            Map<String, String> subscriptionProperties = resetCursorData == null ? null :
+                    resetCursorData.getProperties();
             MessageIdImpl messageId = resetCursorData == null ? null :
                     new MessageIdImpl(resetCursorData.getLedgerId(), resetCursorData.getEntryId(),
                             resetCursorData.getPartitionIndex());
-            internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated);
+            internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative,
+                    replicated, subscriptionProperties);
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 0c102a1198d..3be316691b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1304,7 +1304,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                 }
                                 createInitSubFuture =
                                         topic.createSubscription(initialSubscriptionName, InitialPosition.Earliest,
-                                                false);
+                                                false, null);
                             } else {
                                 createInitSubFuture = CompletableFuture.completedFuture(null);
                             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 06db648c15f..b1ccb4d1eb0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -101,6 +101,8 @@ public interface Subscription {
 
     void addUnAckedMessages(int unAckMessages);
 
+    Map<String, String> getSubscriptionProperties();
+
     default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
         // Default is no-op
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 472bf4efdb8..fc1eadf47e4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -164,7 +164,7 @@ public interface Topic {
     CompletableFuture<Consumer> subscribe(SubscriptionOption option);
 
     CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
-            boolean replicateSubscriptionState);
+            boolean replicateSubscriptionState, Map<String, String> properties);
 
     CompletableFuture<Void> unsubscribe(String subName);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 790725fe8bb..ae49b3623ca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -68,13 +68,15 @@ public class NonPersistentSubscription implements Subscription {
 
     private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
     private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
+    private final Map<String, String> subscriptionProperties;
 
     // If isDurable is false(such as a Reader), remove subscription from topic when closing this subscription.
     private final boolean isDurable;
 
     private KeySharedMode keySharedMode = null;
 
-    public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName, boolean isDurable) {
+    public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName, boolean isDurable,
+                                     Map<String, String> properties) {
         this.topic = topic;
         this.topicName = topic.getName();
         this.subName = subscriptionName;
@@ -82,6 +84,8 @@ public class NonPersistentSubscription implements Subscription {
         IS_FENCED_UPDATER.set(this, FALSE);
         this.lastActive = System.currentTimeMillis();
         this.isDurable = isDurable;
+        this.subscriptionProperties = properties != null
+                ? Collections.unmodifiableMap(properties) : Collections.emptyMap();
     }
 
     @Override
@@ -518,4 +522,8 @@ public class NonPersistentSubscription implements Subscription {
     public void updateLastActive() {
         this.lastActive = System.currentTimeMillis();
     }
+
+    public Map<String, String> getSubscriptionProperties() {
+        return subscriptionProperties;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 61c9779dbcf..82d610c12b7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -249,7 +249,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
                 option.isDurable(), option.getStartMessageId(), option.getMetadata(),
                 option.isReadCompacted(), option.getInitialPosition(),
                 option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(),
-                option.getKeySharedMeta());
+                option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(null));
     }
 
     @Override
@@ -262,7 +262,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
                                                  KeySharedMeta keySharedMeta) {
         return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName,
                 isDurable, startMessageId, metadata, readCompacted, initialPosition, resetStartMessageBackInSec,
-                replicateSubscriptionState, keySharedMeta);
+                replicateSubscriptionState, keySharedMeta, null);
     }
 
     private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, String subscriptionName,
@@ -272,7 +272,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
                                                           boolean readCompacted, InitialPosition initialPosition,
                                                           long resetStartMessageBackInSec,
                                                           boolean replicateSubscriptionState,
-                                                          KeySharedMeta keySharedMeta) {
+                                                          KeySharedMeta keySharedMeta,
+                                                          Map<String, String> subscriptionProperties) {
 
         return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
             final CompletableFuture<Consumer> future = new CompletableFuture<>();
@@ -312,7 +313,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
             }
 
             NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
-                    name -> new NonPersistentSubscription(this, subscriptionName, isDurable));
+                    name -> new NonPersistentSubscription(this, subscriptionName, isDurable, subscriptionProperties));
 
             Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
                     false, cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta,
@@ -364,8 +365,9 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
 
     @Override
     public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
-            boolean replicateSubscriptionState) {
-        return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName, true));
+            boolean replicateSubscriptionState, Map<String, String> properties) {
+        return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName, true,
+                properties));
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index ec77fe69eed..3b23e86f9ad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1089,6 +1089,7 @@ public class PersistentSubscription implements Subscription {
         }
     }
 
+    @Override
     public Map<String, String> getSubscriptionProperties() {
         return subscriptionProperties;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6e7efc0a90a..acb965f0429 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -994,9 +994,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
     @Override
     public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
-                                                              boolean replicateSubscriptionState) {
+                                                              boolean replicateSubscriptionState,
+                                                              Map<String, String> subscriptionProperties) {
         return getDurableSubscription(subscriptionName, initialPosition,
-                0 /*avoid reseting cursor*/, replicateSubscriptionState, null);
+                0 /*avoid reseting cursor*/, replicateSubscriptionState, subscriptionProperties);
     }
 
     /**
@@ -1482,7 +1483,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return isCompactionEnabled()
                 // If a topic has a compaction policy setup, we must make sure that the compaction cursor
                 // is pre-created, in order to ensure all the data will be seen by the compactor.
-                ? createSubscription(COMPACTION_SUBSCRIPTION, CommandSubscribe.InitialPosition.Earliest, false)
+                ? createSubscription(COMPACTION_SUBSCRIPTION, CommandSubscribe.InitialPosition.Earliest, false, null)
                         .thenCompose(__ -> CompletableFuture.completedFuture(null))
                 : CompletableFuture.completedFuture(null);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index 2b1ae4ba193..a77ac76be8f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -195,7 +195,7 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
             log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subcription",
                     topic, update.getSubscriptionName(), updatedMessageId.getLedgerId(), pos);
             topic.createSubscription(update.getSubscriptionName(),
-                    InitialPosition.Latest, true /* replicateSubscriptionState */);
+                    InitialPosition.Latest, true /* replicateSubscriptionState */, null);
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
index 8e4a1d54a3e..8d25c61e1dd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
@@ -19,15 +19,22 @@
 package org.apache.pulsar.broker.admin;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.expectThrows;
+
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 import javax.ws.rs.core.Response;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -106,4 +113,66 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
         assertEquals(exception.getStatusCode(), Response.Status.NOT_FOUND.getStatusCode());
         assertEquals(exception.getMessage(), "Topic not found");
     }
+
+    @DataProvider(name = "partitioned")
+    public static Object[][] partitioned() {
+        return new Object[][] {
+                {true},
+                {false}
+        };
+    }
+
+    @Test(dataProvider = "partitioned")
+    public void testCreateSubscriptionWithProperties(boolean partitioned) throws Exception {
+        String uuid = UUID.randomUUID().toString();
+        String topic = uuid + "-" + partitioned;
+
+        if (partitioned) {
+            admin.topics().createPartitionedTopic(topic, 4);
+        } else {
+            admin.topics().createNonPartitionedTopic(topic);
+        }
+
+        String subscriptionName = "sub";
+        Map<String, String> properties = new HashMap<>();
+        // test characters that often have problems in query strings
+        String value = "bar{}€/&:#[] ?'\"";
+        properties.put("foo", value);
+        admin.topics().createSubscription(topic, subscriptionName,
+                MessageId.latest, false, properties);
+
+        // null properties (old clients)
+        String subscriptionName2 = "sub2";
+        admin.topics().createSubscription(topic, subscriptionName2,
+                MessageId.latest, false, null);
+
+        if (partitioned) {
+            PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topic);
+            for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
+                SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
+                        .getSubscriptions().get(subscriptionName);
+                assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+            }
+
+            // properties are never null, but an empty map
+            for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
+                SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
+                        .getSubscriptions().get(subscriptionName2);
+                assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
+            }
+
+            // aggregated properties
+            SubscriptionStats subscriptionStats = admin.topics().getPartitionedStats(topic, false)
+                    .getSubscriptions().get(subscriptionName);
+            assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+
+        } else {
+            SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
+            assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+
+            SubscriptionStats subscriptionStats2 = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName2);
+            assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty());
+        }
+
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index f78bfde7406..a885f3527ed 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -601,7 +601,7 @@ public class TransactionTest extends TransactionTestBase {
         persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
         PersistentSubscription persistentSubscription = (PersistentSubscription) persistentTopic
                 .createSubscription("test",
-                CommandSubscribe.InitialPosition.Earliest, false).get();
+                CommandSubscribe.InitialPosition.Earliest, false, null).get();
 
         ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class);
         doReturn(true).when(managedCursor).hasMoreEntries();
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 27c13ef137e..7b533030961 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1595,7 +1595,78 @@ public interface Topics {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    void createSubscription(String topic, String subscriptionName, MessageId messageId)
+    default void createSubscription(String topic, String subscriptionName, MessageId messageId)
+            throws PulsarAdminException {
+        createSubscription(topic, subscriptionName, messageId, false);
+    };
+
+    /**
+     * Create a new subscription on a topic.
+     *
+     * @param topic
+     *            topic name
+     * @param subscriptionName
+     *            Subscription name
+     * @param messageId
+     *            The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest},
+     *            {@link MessageId#earliest} or a specific message id.
+     */
+    default CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptionName,
+                                                            MessageId messageId) {
+        return createSubscriptionAsync(topic, subscriptionName, messageId, false);
+    }
+
+    /**
+     * Create a new subscription on a topic.
+     *
+     * @param topic
+     *            topic name
+     * @param subscriptionName
+     *            Subscription name
+     * @param messageId
+     *            The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest},
+     *            {@link MessageId#earliest} or a specific message id.
+     * @param replicated
+     *            replicated subscriptions.
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws ConflictException
+     *             Subscription already exists
+     * @throws NotAllowedException
+     *             Command disallowed for requested resource
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    default void createSubscription(String topic, String subscriptionName, MessageId messageId, boolean replicated)
+            throws PulsarAdminException {
+        createSubscription(topic, subscriptionName, messageId, replicated, null);
+    }
+
+    /**
+     * Create a new subscription on a topic.
+     *
+     * @param topic
+     *            topic name
+     * @param subscriptionName
+     *            Subscription name
+     * @param messageId
+     *            The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest},
+     *            {@link MessageId#earliest} or a specific message id.
+     * @param replicated
+     *            replicated subscriptions.
+     * @param properties
+     *            subscription properties.
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws ConflictException
+     *             Subscription already exists
+     * @throws NotAllowedException
+     *             Command disallowed for requested resource
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void createSubscription(String topic, String subscriptionName, MessageId messageId, boolean replicated,
+                            Map<String, String> properties)
             throws PulsarAdminException;
 
     /**
@@ -1609,7 +1680,29 @@ public interface Topics {
      *            The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest},
      *            {@link MessageId#earliest} or a specific message id.
      */
-    CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptionName, MessageId messageId);
+    default CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptionName, MessageId messageId,
+                                                    boolean replicated) {
+        return createSubscriptionAsync(topic, subscriptionName, messageId, replicated, null);
+    }
+
+    /**
+     * Create a new subscription on a topic.
+     *
+     * @param topic
+     *            topic name
+     * @param subscriptionName
+     *            Subscription name
+     * @param messageId
+     *            The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest},
+     *            {@link MessageId#earliest} or a specific message id.
+     *
+     * @param replicated
+     *           replicated subscriptions.
+     * @param properties
+     *            subscription properties.
+     */
+    CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptionName, MessageId messageId,
+                                                    boolean replicated, Map<String, String> properties);
 
     /**
      * Reset cursor position on a topic subscription.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 66be8e7266f..1667cc4f90a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1097,25 +1097,27 @@ public class TopicsImpl extends BaseResource implements Topics {
 
 
     @Override
-    public void createSubscription(String topic, String subscriptionName, MessageId messageId)
+    public void createSubscription(String topic, String subscriptionName, MessageId messageId, boolean replicated,
+                                   Map<String, String> properties)
             throws PulsarAdminException {
-        try {
-            TopicName tn = validateTopic(topic);
-            String encodedSubName = Codec.encode(subscriptionName);
-            WebTarget path = topicPath(tn, "subscription", encodedSubName);
-            request(path).put(Entity.entity(messageId, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
-        }
+        sync(() -> createSubscriptionAsync(topic, subscriptionName, messageId, replicated, properties));
     }
 
     @Override
     public CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptionName,
-            MessageId messageId) {
+            MessageId messageId, boolean replicated, Map<String, String> properties) {
         TopicName tn = validateTopic(topic);
         String encodedSubName = Codec.encode(subscriptionName);
         WebTarget path = topicPath(tn, "subscription", encodedSubName);
-        return asyncPutRequest(path, Entity.entity(messageId, MediaType.APPLICATION_JSON));
+        path = path.queryParam("replicated", replicated);
+        Object payload = messageId;
+        if (properties != null && !properties.isEmpty()) {
+            ResetCursorData resetCursorData = messageId != null
+                    ? new ResetCursorData(messageId) : new ResetCursorData(MessageId.latest);
+            resetCursorData.setProperties(properties);
+            payload = resetCursorData;
+        }
+        return asyncPutRequest(path, Entity.entity(payload, MediaType.APPLICATION_JSON));
     }
 
     @Override
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 464e7aa1fab..f2bee263d15 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1409,7 +1409,18 @@ public class PulsarAdminToolTest {
         verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1", 100);
 
         cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest"));
-        verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest);
+        verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, false, null);
+
+        // jcommander is stateful, you cannot parse the same command twice
+        cmdTopics = new CmdTopics(() -> admin);
+        cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest --property a=b"));
+        Map<String, String> props = new HashMap<>();
+        props.put("a", "b");
+        verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, false, props);
+
+        cmdTopics = new CmdTopics(() -> admin);
+        cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest -r"));
+        verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, true, null);
 
         cmdTopics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32"));
         verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32, null);
@@ -1418,7 +1429,7 @@ public class PulsarAdminToolTest {
         verify(mockTopics).createMissedPartitions("persistent://myprop/clust/ns1/ds1");
 
         cmdTopics.run(split("create persistent://myprop/clust/ns1/ds1"));
-        verify(mockTopics).createNonPartitionedTopic("persistent://myprop/clust/ns1/ds1", new HashMap<>());
+        verify(mockTopics).createNonPartitionedTopic("persistent://myprop/clust/ns1/ds1", null);
 
         cmdTopics.run(split("list-partitioned-topics myprop/clust/ns1"));
         verify(mockTopics).getPartitionedTopicList("myprop/clust/ns1");
@@ -1801,8 +1812,16 @@ public class PulsarAdminToolTest {
         topics.run(split("expire-messages-all-subscriptions persistent://myprop/clust/ns1/ds1 -t 100"));
         verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1", 100);
 
+        topics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest -p a=b --property c=d"));
+        Map<String, String> props = new HashMap<>();
+        props.put("a", "b");
+        props.put("c", "d");
+        verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, false, props);
+
+        // jcommander is stateful, you cannot parse the same command twice
+        topics = new CmdPersistentTopics(() -> admin);
         topics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest"));
-        verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest);
+        verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, false, null);
 
         topics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32"));
         verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
index fdff07e2b85..7a5b656586c 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
@@ -23,6 +23,9 @@ import com.beust.jcommander.IUsageFormatter;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.function.Supplier;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -110,4 +113,21 @@ public abstract class CmdBase {
         }
         return admin;
     }
+
+
+    static Map<String, String> parseListKeyValueMap(List<String> metadata) {
+        Map<String, String> map = null;
+        if (metadata != null && !metadata.isEmpty()) {
+            map = new HashMap<>();
+            for (String property : metadata) {
+                int pos = property.indexOf('=');
+                if (pos <= 0) {
+                    throw new ParameterException(String.format("Invalid key value pair '%s', "
+                            + "valid format like 'a=b'.", property));
+                }
+                map.put(property.substring(0, pos), property.substring(pos + 1));
+            }
+        }
+        return map;
+    }
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 0d77908b090..526b8818ada 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -28,7 +28,6 @@ import com.google.common.collect.Sets;
 import io.swagger.util.Json;
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -2403,24 +2402,11 @@ public class CmdNamespaces extends CmdBase {
         @Override
         void run() throws Exception {
             String namespace = validateNamespace(params);
-            Map<String, String> map = new HashMap<>();
             if (properties.size() == 0) {
                 throw new ParameterException(String.format("Required at least one property for the namespace, "
                         + "but found %d.", properties.size()));
             }
-            for (String property : properties) {
-                if (!property.contains("=")) {
-                    throw new ParameterException(String.format("Invalid key value pair '%s', "
-                            + "valid format like 'a=a,b=b,c=c'.", property));
-                } else {
-                    String[] keyValue = property.split("=");
-                    if (keyValue.length != 2) {
-                        throw new ParameterException(String.format("Invalid key value pair '%s', "
-                                + "valid format like 'a=a,b=b,c=c'.", property));
-                    }
-                    map.put(keyValue[0], keyValue[1]);
-                }
-            }
+            Map<String, String> map = parseListKeyValueMap(properties);
             getAdmin().namespaces().setProperties(namespace, map);
         }
     }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
index 766be620e87..7e0cf21245a 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
@@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
@@ -496,6 +497,10 @@ public class CmdPersistentTopics extends CmdBase {
                 + "It can be either 'latest', 'earliest' or (ledgerId:entryId)", required = false)
         private String messageIdStr = "latest";
 
+        @Parameter(names = {"--property", "-p"}, description = "key value pair properties(-p a=b -p c=d)",
+                required = false)
+        private java.util.List<String> properties;
+
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
@@ -507,8 +512,8 @@ public class CmdPersistentTopics extends CmdBase {
             } else {
                 messageId = validateMessageIdString(messageIdStr);
             }
-
-            getPersistentTopics().createSubscription(persistentTopic, subscriptionName, messageId);
+            Map<String, String> map = parseListKeyValueMap(properties);
+            getPersistentTopics().createSubscription(persistentTopic, subscriptionName, messageId, false, map);
         }
     }
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index e9251326785..883617f89b9 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -517,23 +517,7 @@ public class CmdTopics extends CmdBase {
         @Override
         void run() throws Exception {
             String topic = validateTopicName(params);
-            Map<String, String> map = null;
-            if (metadata != null && !metadata.isEmpty()) {
-                map = new HashMap<>();
-                for (String property : metadata) {
-                    if (!property.contains("=")) {
-                        throw new ParameterException(String.format("Invalid key value pair '%s', "
-                                + "valid format like 'a=a,b=b,c=c'.", property));
-                    } else {
-                        String[] keyValue = property.split("=");
-                        if (keyValue.length != 2) {
-                            throw new ParameterException(String.format("Invalid key value pair '%s', "
-                                    + "valid format like 'a=a,b=b,c=c'.", property));
-                        }
-                        map.put(keyValue[0], keyValue[1]);
-                    }
-                }
-            }
+            Map<String, String> map = parseListKeyValueMap(metadata);
             getTopics().createPartitionedTopic(topic, numPartitions, map);
         }
     }
@@ -565,22 +549,7 @@ public class CmdTopics extends CmdBase {
         @Override
         void run() throws Exception {
             String topic = validateTopicName(params);
-            Map<String, String> map = new HashMap<>();
-            if (metadata != null) {
-                for (String property : metadata) {
-                    if (!property.contains("=")) {
-                        throw new ParameterException(String.format("Invalid key value pair '%s', "
-                                + "valid format like 'a=a,b=b,c=c'.", property));
-                    } else {
-                        String[] keyValue = property.split("=");
-                        if (keyValue.length != 2) {
-                            throw new ParameterException(String.format("Invalid key value pair '%s', "
-                                    + "valid format like 'a=a,b=b,c=c'.", property));
-                        }
-                        map.put(keyValue[0], keyValue[1]);
-                    }
-                }
-            }
+            Map<String, String> map = parseListKeyValueMap(metadata);
             getTopics().createNonPartitionedTopic(topic, map);
         }
     }
@@ -930,6 +899,10 @@ public class CmdTopics extends CmdBase {
                 + "It can be either 'latest', 'earliest' or (ledgerId:entryId)", required = false)
         private String messageIdStr = "latest";
 
+        @Parameter(names = {"--property", "-p"}, description = "key value pair properties(-p a=b -p c=d)",
+                required = false)
+        private java.util.List<String> properties;
+
         @Override
         void run() throws PulsarAdminException {
             String topic = validateTopicName(params);
@@ -941,8 +914,8 @@ public class CmdTopics extends CmdBase {
             } else {
                 messageId = validateMessageIdString(messageIdStr);
             }
-
-            getTopics().createSubscription(topic, subscriptionName, messageId);
+            Map<String, String> map = parseListKeyValueMap(properties);
+            getTopics().createSubscription(topic, subscriptionName, messageId, false, map);
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
index b8399807304..03b6366018e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
-
+import java.util.Map;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.pulsar.client.api.MessageId;
@@ -31,6 +31,7 @@ public class ResetCursorData {
     protected int partitionIndex = -1;
     protected boolean isExcluded = false;
     protected int batchIndex = -1;
+    protected Map<String, String> properties;
 
     public ResetCursorData(long ledgerId, long entryId) {
         this.ledgerId = ledgerId;
@@ -43,6 +44,13 @@ public class ResetCursorData {
         this.isExcluded = isExcluded;
     }
 
+    public ResetCursorData(long ledgerId, long entryId, boolean isExcluded, Map<String, String> properties) {
+        this.ledgerId = ledgerId;
+        this.entryId = entryId;
+        this.isExcluded = isExcluded;
+        this.properties = properties;
+    }
+
     // Private constructor used only for json deserialization
     private ResetCursorData(String position) {
         if ("latest".equals(position)) {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index f4dc1c8ecf0..b0da409e0fe 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -173,6 +173,31 @@ public class CLITest extends PulsarTestSuite {
         }
     }
 
+    @Test
+    public void testCreateSubscriptionWithPropertiesCommand() throws Exception {
+        String topic = "testCreateSubscriptionCommmand";
+
+        String subscriptionPrefix = "subscription-";
+
+        int i = 0;
+        for (BrokerContainer container : pulsarCluster.getBrokers()) {
+            ContainerExecResult result = container.execCmd(
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "topics",
+                    "create-subscription",
+                    "-p",
+                    "a=b",
+                    "-p",
+                    "c=d",
+                    "persistent://public/default/" + topic,
+                    "--subscription",
+                    "" + subscriptionPrefix + i
+            );
+            result.assertNoOutput();
+            i++;
+        }
+    }
+
     @Test
     public void testTopicTerminationOnTopicsWithoutConnectedConsumers() throws Exception {
         String topicName = "persistent://public/default/test-topic-termination";