You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/05/23 15:56:37 UTC
[pulsar] branch branch-2.10 updated: [enh] Issue 15455: Pulsar Admin: create subscripion with Properties (PIP-105) (#15503) (#15681)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new c7f9ffdf229 [enh] Issue 15455: Pulsar Admin: create subscripion with Properties (PIP-105) (#15503) (#15681)
c7f9ffdf229 is described below
commit c7f9ffdf22933f74f255e71fb512ee5c48fa4f45
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Mon May 23 17:56:32 2022 +0200
[enh] Issue 15455: Pulsar Admin: create subscripion with Properties (PIP-105) (#15503) (#15681)
(cherry picked from commit e2fa1898f7e9a38b1743b2abf21c87eafe126ee2)
---
.../broker/admin/impl/PersistentTopicsBase.java | 19 +++--
.../pulsar/broker/admin/v1/PersistentTopics.java | 3 +-
.../pulsar/broker/admin/v2/PersistentTopics.java | 5 +-
.../apache/pulsar/broker/service/ServerCnx.java | 2 +-
.../apache/pulsar/broker/service/Subscription.java | 2 +
.../org/apache/pulsar/broker/service/Topic.java | 2 +-
.../nonpersistent/NonPersistentSubscription.java | 10 ++-
.../service/nonpersistent/NonPersistentTopic.java | 14 ++--
.../service/persistent/PersistentSubscription.java | 1 +
.../broker/service/persistent/PersistentTopic.java | 7 +-
.../ReplicatedSubscriptionsController.java | 2 +-
.../broker/admin/AdminApiSubscriptionTest.java | 69 +++++++++++++++
.../pulsar/broker/transaction/TransactionTest.java | 2 +-
.../org/apache/pulsar/client/admin/Topics.java | 97 +++++++++++++++++++++-
.../pulsar/client/admin/internal/TopicsImpl.java | 24 +++---
.../pulsar/admin/cli/PulsarAdminToolTest.java | 25 +++++-
.../java/org/apache/pulsar/admin/cli/CmdBase.java | 20 +++++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 16 +---
.../pulsar/admin/cli/CmdPersistentTopics.java | 9 +-
.../org/apache/pulsar/admin/cli/CmdTopics.java | 43 ++--------
.../apache/pulsar/client/impl/ResetCursorData.java | 10 ++-
.../pulsar/tests/integration/cli/CLITest.java | 25 ++++++
22 files changed, 313 insertions(+), 94 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b286a6000dc..4228ac6254d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2114,7 +2114,7 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalCreateSubscription(AsyncResponse asyncResponse, String subscriptionName,
- MessageIdImpl messageId, boolean authoritative, boolean replicated) {
+ MessageIdImpl messageId, boolean authoritative, boolean replicated, Map<String, String> properties) {
CompletableFuture<Void> ret;
if (topicName.isGlobal()) {
ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
@@ -2123,12 +2123,12 @@ public class PersistentTopicsBase extends AdminResource {
}
ret.thenAccept(__ -> {
final MessageIdImpl targetMessageId = messageId == null ? (MessageIdImpl) MessageId.latest : messageId;
- log.info("[{}][{}] Creating subscription {} at message id {}", clientAppId(), topicName, subscriptionName,
- targetMessageId);
+ log.info("[{}][{}] Creating subscription {} at message id {} with properties {}", clientAppId(),
+ topicName, subscriptionName, targetMessageId, properties);
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
- subscriptionName, targetMessageId, authoritative, replicated);
+ subscriptionName, targetMessageId, authoritative, replicated, properties);
} else {
boolean allowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);
getPartitionedTopicMetadataAsync(topicName,
@@ -2146,7 +2146,7 @@ public class PersistentTopicsBase extends AdminResource {
try {
pulsar().getAdminClient().topics()
.createSubscriptionAsync(topicNamePartition.toString(),
- subscriptionName, targetMessageId)
+ subscriptionName, targetMessageId, false, properties)
.handle((r, ex) -> {
if (ex != null) {
// fail the operation on unknown exception or
@@ -2200,7 +2200,7 @@ public class PersistentTopicsBase extends AdminResource {
});
} else {
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
- subscriptionName, targetMessageId, authoritative, replicated);
+ subscriptionName, targetMessageId, authoritative, replicated, properties);
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
@@ -2225,7 +2225,8 @@ public class PersistentTopicsBase extends AdminResource {
private void internalCreateSubscriptionForNonPartitionedTopic(
AsyncResponse asyncResponse, String subscriptionName,
- MessageIdImpl targetMessageId, boolean authoritative, boolean replicated) {
+ MessageIdImpl targetMessageId, boolean authoritative, boolean replicated,
+ Map<String, String> properties) {
boolean isAllowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);
@@ -2245,7 +2246,7 @@ public class PersistentTopicsBase extends AdminResource {
throw new RestException(Status.CONFLICT, "Subscription already exists for topic");
}
- return topic.createSubscription(subscriptionName, InitialPosition.Latest, replicated);
+ return topic.createSubscription(subscriptionName, InitialPosition.Latest, replicated, properties);
}).thenCompose(subscription -> {
// Mark the cursor as "inactive" as it was created without a real consumer connected
((PersistentSubscription) subscription).deactivateCursor();
@@ -4004,7 +4005,7 @@ public class PersistentTopicsBase extends AdminResource {
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
sub = topic.createSubscription(subName,
- InitialPosition.Earliest, false).get();
+ InitialPosition.Earliest, false, null).get();
}
return checkNotNull(sub);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index fcb71566f6b..283ba0ebccd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -652,7 +652,8 @@ public class PersistentTopics extends PersistentTopicsBase {
throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic "
+ "can only be done through client");
}
- internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated);
+ internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated,
+ null);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 94a2ae7536c..fd5cb75b10b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1410,10 +1410,13 @@ public class PersistentTopics extends PersistentTopicsBase {
throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic "
+ "can only be done through client");
}
+ Map<String, String> subscriptionProperties = resetCursorData == null ? null :
+ resetCursorData.getProperties();
MessageIdImpl messageId = resetCursorData == null ? null :
new MessageIdImpl(resetCursorData.getLedgerId(), resetCursorData.getEntryId(),
resetCursorData.getPartitionIndex());
- internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated);
+ internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative,
+ replicated, subscriptionProperties);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 0c102a1198d..3be316691b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1304,7 +1304,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
}
createInitSubFuture =
topic.createSubscription(initialSubscriptionName, InitialPosition.Earliest,
- false);
+ false, null);
} else {
createInitSubFuture = CompletableFuture.completedFuture(null);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 06db648c15f..b1ccb4d1eb0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -101,6 +101,8 @@ public interface Subscription {
void addUnAckedMessages(int unAckMessages);
+ Map<String, String> getSubscriptionProperties();
+
default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
// Default is no-op
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 472bf4efdb8..fc1eadf47e4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -164,7 +164,7 @@ public interface Topic {
CompletableFuture<Consumer> subscribe(SubscriptionOption option);
CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
- boolean replicateSubscriptionState);
+ boolean replicateSubscriptionState, Map<String, String> properties);
CompletableFuture<Void> unsubscribe(String subName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 790725fe8bb..ae49b3623ca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -68,13 +68,15 @@ public class NonPersistentSubscription implements Subscription {
private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
+ private final Map<String, String> subscriptionProperties;
// If isDurable is false(such as a Reader), remove subscription from topic when closing this subscription.
private final boolean isDurable;
private KeySharedMode keySharedMode = null;
- public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName, boolean isDurable) {
+ public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName, boolean isDurable,
+ Map<String, String> properties) {
this.topic = topic;
this.topicName = topic.getName();
this.subName = subscriptionName;
@@ -82,6 +84,8 @@ public class NonPersistentSubscription implements Subscription {
IS_FENCED_UPDATER.set(this, FALSE);
this.lastActive = System.currentTimeMillis();
this.isDurable = isDurable;
+ this.subscriptionProperties = properties != null
+ ? Collections.unmodifiableMap(properties) : Collections.emptyMap();
}
@Override
@@ -518,4 +522,8 @@ public class NonPersistentSubscription implements Subscription {
public void updateLastActive() {
this.lastActive = System.currentTimeMillis();
}
+
+ public Map<String, String> getSubscriptionProperties() {
+ return subscriptionProperties;
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 61c9779dbcf..82d610c12b7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -249,7 +249,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
option.isDurable(), option.getStartMessageId(), option.getMetadata(),
option.isReadCompacted(), option.getInitialPosition(),
option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(),
- option.getKeySharedMeta());
+ option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(null));
}
@Override
@@ -262,7 +262,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
KeySharedMeta keySharedMeta) {
return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName,
isDurable, startMessageId, metadata, readCompacted, initialPosition, resetStartMessageBackInSec,
- replicateSubscriptionState, keySharedMeta);
+ replicateSubscriptionState, keySharedMeta, null);
}
private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, String subscriptionName,
@@ -272,7 +272,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
boolean readCompacted, InitialPosition initialPosition,
long resetStartMessageBackInSec,
boolean replicateSubscriptionState,
- KeySharedMeta keySharedMeta) {
+ KeySharedMeta keySharedMeta,
+ Map<String, String> subscriptionProperties) {
return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
final CompletableFuture<Consumer> future = new CompletableFuture<>();
@@ -312,7 +313,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
}
NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
- name -> new NonPersistentSubscription(this, subscriptionName, isDurable));
+ name -> new NonPersistentSubscription(this, subscriptionName, isDurable, subscriptionProperties));
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
false, cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta,
@@ -364,8 +365,9 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
@Override
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
- boolean replicateSubscriptionState) {
- return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName, true));
+ boolean replicateSubscriptionState, Map<String, String> properties) {
+ return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName, true,
+ properties));
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index ec77fe69eed..3b23e86f9ad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1089,6 +1089,7 @@ public class PersistentSubscription implements Subscription {
}
}
+ @Override
public Map<String, String> getSubscriptionProperties() {
return subscriptionProperties;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6e7efc0a90a..acb965f0429 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -994,9 +994,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
@Override
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
- boolean replicateSubscriptionState) {
+ boolean replicateSubscriptionState,
+ Map<String, String> subscriptionProperties) {
return getDurableSubscription(subscriptionName, initialPosition,
- 0 /*avoid reseting cursor*/, replicateSubscriptionState, null);
+ 0 /*avoid reseting cursor*/, replicateSubscriptionState, subscriptionProperties);
}
/**
@@ -1482,7 +1483,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return isCompactionEnabled()
// If a topic has a compaction policy setup, we must make sure that the compaction cursor
// is pre-created, in order to ensure all the data will be seen by the compactor.
- ? createSubscription(COMPACTION_SUBSCRIPTION, CommandSubscribe.InitialPosition.Earliest, false)
+ ? createSubscription(COMPACTION_SUBSCRIPTION, CommandSubscribe.InitialPosition.Earliest, false, null)
.thenCompose(__ -> CompletableFuture.completedFuture(null))
: CompletableFuture.completedFuture(null);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index 2b1ae4ba193..a77ac76be8f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -195,7 +195,7 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subcription",
topic, update.getSubscriptionName(), updatedMessageId.getLedgerId(), pos);
topic.createSubscription(update.getSubscriptionName(),
- InitialPosition.Latest, true /* replicateSubscriptionState */);
+ InitialPosition.Latest, true /* replicateSubscriptionState */, null);
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
index 8e4a1d54a3e..8d25c61e1dd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
@@ -19,15 +19,22 @@
package org.apache.pulsar.broker.admin;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
+
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@@ -106,4 +113,66 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
assertEquals(exception.getStatusCode(), Response.Status.NOT_FOUND.getStatusCode());
assertEquals(exception.getMessage(), "Topic not found");
}
+
+ @DataProvider(name = "partitioned")
+ public static Object[][] partitioned() {
+ return new Object[][] {
+ {true},
+ {false}
+ };
+ }
+
+ @Test(dataProvider = "partitioned")
+ public void testCreateSubscriptionWithProperties(boolean partitioned) throws Exception {
+ String uuid = UUID.randomUUID().toString();
+ String topic = uuid + "-" + partitioned;
+
+ if (partitioned) {
+ admin.topics().createPartitionedTopic(topic, 4);
+ } else {
+ admin.topics().createNonPartitionedTopic(topic);
+ }
+
+ String subscriptionName = "sub";
+ Map<String, String> properties = new HashMap<>();
+ // test characters that often have problems in query strings
+ String value = "bar{}€/&:#[] ?'\"";
+ properties.put("foo", value);
+ admin.topics().createSubscription(topic, subscriptionName,
+ MessageId.latest, false, properties);
+
+ // null properties (old clients)
+ String subscriptionName2 = "sub2";
+ admin.topics().createSubscription(topic, subscriptionName2,
+ MessageId.latest, false, null);
+
+ if (partitioned) {
+ PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topic);
+ for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
+ SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
+ .getSubscriptions().get(subscriptionName);
+ assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+ }
+
+ // properties are never null, but an empty map
+ for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
+ SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
+ .getSubscriptions().get(subscriptionName2);
+ assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
+ }
+
+ // aggregated properties
+ SubscriptionStats subscriptionStats = admin.topics().getPartitionedStats(topic, false)
+ .getSubscriptions().get(subscriptionName);
+ assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+
+ } else {
+ SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
+ assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+
+ SubscriptionStats subscriptionStats2 = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName2);
+ assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty());
+ }
+
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index f78bfde7406..a885f3527ed 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -601,7 +601,7 @@ public class TransactionTest extends TransactionTestBase {
persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
PersistentSubscription persistentSubscription = (PersistentSubscription) persistentTopic
.createSubscription("test",
- CommandSubscribe.InitialPosition.Earliest, false).get();
+ CommandSubscribe.InitialPosition.Earliest, false, null).get();
ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class);
doReturn(true).when(managedCursor).hasMoreEntries();
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 27c13ef137e..7b533030961 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1595,7 +1595,78 @@ public interface Topics {
* @throws PulsarAdminException
* Unexpected error
*/
- void createSubscription(String topic, String subscriptionName, MessageId messageId)
+ default void createSubscription(String topic, String subscriptionName, MessageId messageId)
+ throws PulsarAdminException {
+ createSubscription(topic, subscriptionName, messageId, false);
+ };
+
+ /**
+ * Create a new subscription on a topic.
+ *
+ * @param topic
+ * topic name
+ * @param subscriptionName
+ * Subscription name
+ * @param messageId
+ * The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest},
+ * {@link MessageId#earliest} or a specific message id.
+ */
+ default CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptionName,
+ MessageId messageId) {
+ return createSubscriptionAsync(topic, subscriptionName, messageId, false);
+ }
+
+ /**
+ * Create a new subscription on a topic.
+ *
+ * @param topic
+ * topic name
+ * @param subscriptionName
+ * Subscription name
+ * @param messageId
+ * The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest},
+ * {@link MessageId#earliest} or a specific message id.
+ * @param replicated
+ * replicated subscriptions.
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws ConflictException
+ * Subscription already exists
+ * @throws NotAllowedException
+ * Command disallowed for requested resource
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ default void createSubscription(String topic, String subscriptionName, MessageId messageId, boolean replicated)
+ throws PulsarAdminException {
+ createSubscription(topic, subscriptionName, messageId, replicated, null);
+ }
+
+ /**
+ * Create a new subscription on a topic.
+ *
+ * @param topic
+ * topic name
+ * @param subscriptionName
+ * Subscription name
+ * @param messageId
+ * The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest},
+ * {@link MessageId#earliest} or a specific message id.
+ * @param replicated
+ * replicated subscriptions.
+ * @param properties
+ * subscription properties.
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws ConflictException
+ * Subscription already exists
+ * @throws NotAllowedException
+ * Command disallowed for requested resource
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void createSubscription(String topic, String subscriptionName, MessageId messageId, boolean replicated,
+ Map<String, String> properties)
throws PulsarAdminException;
/**
@@ -1609,7 +1680,29 @@ public interface Topics {
* The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest},
* {@link MessageId#earliest} or a specific message id.
*/
- CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptionName, MessageId messageId);
+ default CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptionName, MessageId messageId,
+ boolean replicated) {
+ return createSubscriptionAsync(topic, subscriptionName, messageId, replicated, null);
+ }
+
+ /**
+ * Create a new subscription on a topic.
+ *
+ * @param topic
+ * topic name
+ * @param subscriptionName
+ * Subscription name
+ * @param messageId
+ * The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest},
+ * {@link MessageId#earliest} or a specific message id.
+ *
+ * @param replicated
+ * replicated subscriptions.
+ * @param properties
+ * subscription properties.
+ */
+ CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptionName, MessageId messageId,
+ boolean replicated, Map<String, String> properties);
/**
* Reset cursor position on a topic subscription.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 66be8e7266f..1667cc4f90a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1097,25 +1097,27 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
- public void createSubscription(String topic, String subscriptionName, MessageId messageId)
+ public void createSubscription(String topic, String subscriptionName, MessageId messageId, boolean replicated,
+ Map<String, String> properties)
throws PulsarAdminException {
- try {
- TopicName tn = validateTopic(topic);
- String encodedSubName = Codec.encode(subscriptionName);
- WebTarget path = topicPath(tn, "subscription", encodedSubName);
- request(path).put(Entity.entity(messageId, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
- }
+ sync(() -> createSubscriptionAsync(topic, subscriptionName, messageId, replicated, properties));
}
@Override
public CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptionName,
- MessageId messageId) {
+ MessageId messageId, boolean replicated, Map<String, String> properties) {
TopicName tn = validateTopic(topic);
String encodedSubName = Codec.encode(subscriptionName);
WebTarget path = topicPath(tn, "subscription", encodedSubName);
- return asyncPutRequest(path, Entity.entity(messageId, MediaType.APPLICATION_JSON));
+ path = path.queryParam("replicated", replicated);
+ Object payload = messageId;
+ if (properties != null && !properties.isEmpty()) {
+ ResetCursorData resetCursorData = messageId != null
+ ? new ResetCursorData(messageId) : new ResetCursorData(MessageId.latest);
+ resetCursorData.setProperties(properties);
+ payload = resetCursorData;
+ }
+ return asyncPutRequest(path, Entity.entity(payload, MediaType.APPLICATION_JSON));
}
@Override
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 464e7aa1fab..f2bee263d15 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1409,7 +1409,18 @@ public class PulsarAdminToolTest {
verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1", 100);
cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest"));
- verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest);
+ verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, false, null);
+
+ // jcommander is stateful, you cannot parse the same command twice
+ cmdTopics = new CmdTopics(() -> admin);
+ cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest --property a=b"));
+ Map<String, String> props = new HashMap<>();
+ props.put("a", "b");
+ verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, false, props);
+
+ cmdTopics = new CmdTopics(() -> admin);
+ cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest -r"));
+ verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, true, null);
cmdTopics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32"));
verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32, null);
@@ -1418,7 +1429,7 @@ public class PulsarAdminToolTest {
verify(mockTopics).createMissedPartitions("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("create persistent://myprop/clust/ns1/ds1"));
- verify(mockTopics).createNonPartitionedTopic("persistent://myprop/clust/ns1/ds1", new HashMap<>());
+ verify(mockTopics).createNonPartitionedTopic("persistent://myprop/clust/ns1/ds1", null);
cmdTopics.run(split("list-partitioned-topics myprop/clust/ns1"));
verify(mockTopics).getPartitionedTopicList("myprop/clust/ns1");
@@ -1801,8 +1812,16 @@ public class PulsarAdminToolTest {
topics.run(split("expire-messages-all-subscriptions persistent://myprop/clust/ns1/ds1 -t 100"));
verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1", 100);
+ topics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest -p a=b --property c=d"));
+ Map<String, String> props = new HashMap<>();
+ props.put("a", "b");
+ props.put("c", "d");
+ verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, false, props);
+
+ // jcommander is stateful, you cannot parse the same command twice
+ topics = new CmdPersistentTopics(() -> admin);
topics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest"));
- verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest);
+ verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, false, null);
topics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32"));
verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
index fdff07e2b85..7a5b656586c 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
@@ -23,6 +23,9 @@ import com.beust.jcommander.IUsageFormatter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.function.Supplier;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -110,4 +113,21 @@ public abstract class CmdBase {
}
return admin;
}
+
+
+ static Map<String, String> parseListKeyValueMap(List<String> metadata) {
+ Map<String, String> map = null;
+ if (metadata != null && !metadata.isEmpty()) {
+ map = new HashMap<>();
+ for (String property : metadata) {
+ int pos = property.indexOf('=');
+ if (pos <= 0) {
+ throw new ParameterException(String.format("Invalid key value pair '%s', "
+ + "valid format like 'a=b'.", property));
+ }
+ map.put(property.substring(0, pos), property.substring(pos + 1));
+ }
+ }
+ return map;
+ }
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 0d77908b090..526b8818ada 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -28,7 +28,6 @@ import com.google.common.collect.Sets;
import io.swagger.util.Json;
import java.io.IOException;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -2403,24 +2402,11 @@ public class CmdNamespaces extends CmdBase {
@Override
void run() throws Exception {
String namespace = validateNamespace(params);
- Map<String, String> map = new HashMap<>();
if (properties.size() == 0) {
throw new ParameterException(String.format("Required at least one property for the namespace, "
+ "but found %d.", properties.size()));
}
- for (String property : properties) {
- if (!property.contains("=")) {
- throw new ParameterException(String.format("Invalid key value pair '%s', "
- + "valid format like 'a=a,b=b,c=c'.", property));
- } else {
- String[] keyValue = property.split("=");
- if (keyValue.length != 2) {
- throw new ParameterException(String.format("Invalid key value pair '%s', "
- + "valid format like 'a=a,b=b,c=c'.", property));
- }
- map.put(keyValue[0], keyValue[1]);
- }
- }
+ Map<String, String> map = parseListKeyValueMap(properties);
getAdmin().namespaces().setProperties(namespace, map);
}
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
index 766be620e87..7e0cf21245a 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
@@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -496,6 +497,10 @@ public class CmdPersistentTopics extends CmdBase {
+ "It can be either 'latest', 'earliest' or (ledgerId:entryId)", required = false)
private String messageIdStr = "latest";
+ @Parameter(names = {"--property", "-p"}, description = "key value pair properties(-p a=b -p c=d)",
+ required = false)
+ private java.util.List<String> properties;
+
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
@@ -507,8 +512,8 @@ public class CmdPersistentTopics extends CmdBase {
} else {
messageId = validateMessageIdString(messageIdStr);
}
-
- getPersistentTopics().createSubscription(persistentTopic, subscriptionName, messageId);
+ Map<String, String> map = parseListKeyValueMap(properties);
+ getPersistentTopics().createSubscription(persistentTopic, subscriptionName, messageId, false, map);
}
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index e9251326785..883617f89b9 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -517,23 +517,7 @@ public class CmdTopics extends CmdBase {
@Override
void run() throws Exception {
String topic = validateTopicName(params);
- Map<String, String> map = null;
- if (metadata != null && !metadata.isEmpty()) {
- map = new HashMap<>();
- for (String property : metadata) {
- if (!property.contains("=")) {
- throw new ParameterException(String.format("Invalid key value pair '%s', "
- + "valid format like 'a=a,b=b,c=c'.", property));
- } else {
- String[] keyValue = property.split("=");
- if (keyValue.length != 2) {
- throw new ParameterException(String.format("Invalid key value pair '%s', "
- + "valid format like 'a=a,b=b,c=c'.", property));
- }
- map.put(keyValue[0], keyValue[1]);
- }
- }
- }
+ Map<String, String> map = parseListKeyValueMap(metadata);
getTopics().createPartitionedTopic(topic, numPartitions, map);
}
}
@@ -565,22 +549,7 @@ public class CmdTopics extends CmdBase {
@Override
void run() throws Exception {
String topic = validateTopicName(params);
- Map<String, String> map = new HashMap<>();
- if (metadata != null) {
- for (String property : metadata) {
- if (!property.contains("=")) {
- throw new ParameterException(String.format("Invalid key value pair '%s', "
- + "valid format like 'a=a,b=b,c=c'.", property));
- } else {
- String[] keyValue = property.split("=");
- if (keyValue.length != 2) {
- throw new ParameterException(String.format("Invalid key value pair '%s', "
- + "valid format like 'a=a,b=b,c=c'.", property));
- }
- map.put(keyValue[0], keyValue[1]);
- }
- }
- }
+ Map<String, String> map = parseListKeyValueMap(metadata);
getTopics().createNonPartitionedTopic(topic, map);
}
}
@@ -930,6 +899,10 @@ public class CmdTopics extends CmdBase {
+ "It can be either 'latest', 'earliest' or (ledgerId:entryId)", required = false)
private String messageIdStr = "latest";
+ @Parameter(names = {"--property", "-p"}, description = "key value pair properties(-p a=b -p c=d)",
+ required = false)
+ private java.util.List<String> properties;
+
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
@@ -941,8 +914,8 @@ public class CmdTopics extends CmdBase {
} else {
messageId = validateMessageIdString(messageIdStr);
}
-
- getTopics().createSubscription(topic, subscriptionName, messageId);
+ Map<String, String> map = parseListKeyValueMap(properties);
+ getTopics().createSubscription(topic, subscriptionName, messageId, false, map);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
index b8399807304..03b6366018e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.client.impl;
-
+import java.util.Map;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.MessageId;
@@ -31,6 +31,7 @@ public class ResetCursorData {
protected int partitionIndex = -1;
protected boolean isExcluded = false;
protected int batchIndex = -1;
+ protected Map<String, String> properties;
public ResetCursorData(long ledgerId, long entryId) {
this.ledgerId = ledgerId;
@@ -43,6 +44,13 @@ public class ResetCursorData {
this.isExcluded = isExcluded;
}
+ public ResetCursorData(long ledgerId, long entryId, boolean isExcluded, Map<String, String> properties) {
+ this.ledgerId = ledgerId;
+ this.entryId = entryId;
+ this.isExcluded = isExcluded;
+ this.properties = properties;
+ }
+
// Private constructor used only for json deserialization
private ResetCursorData(String position) {
if ("latest".equals(position)) {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index f4dc1c8ecf0..b0da409e0fe 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -173,6 +173,31 @@ public class CLITest extends PulsarTestSuite {
}
}
+ @Test
+ public void testCreateSubscriptionWithPropertiesCommand() throws Exception {
+ String topic = "testCreateSubscriptionCommmand";
+
+ String subscriptionPrefix = "subscription-";
+
+ int i = 0;
+ for (BrokerContainer container : pulsarCluster.getBrokers()) {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "topics",
+ "create-subscription",
+ "-p",
+ "a=b",
+ "-p",
+ "c=d",
+ "persistent://public/default/" + topic,
+ "--subscription",
+ "" + subscriptionPrefix + i
+ );
+ result.assertNoOutput();
+ i++;
+ }
+ }
+
@Test
public void testTopicTerminationOnTopicsWithoutConnectedConsumers() throws Exception {
String topicName = "persistent://public/default/test-topic-termination";