You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/03 15:26:34 UTC
[pulsar] 03/03: PIP-105 add support for updating the Subscription properties (#15751)
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9dd4057d5bccdc93d29cab49cdb969f507a56739
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Fri Jun 3 13:44:16 2022 +0200
PIP-105 add support for updating the Subscription properties (#15751)
* PIP-105 add support for updating the Subscription properties
* Implement command update-subscription-properties
* Add tests
* Add volatile
* Fix PersistentTopicTest
* PIP-105: Store Subscription properties
* Fix FilterEntryTest
* Add volatile
* Fix PersistentTopicTest
* fix ServerCnxTest test
* Switch from POST to PUT
* rename to /properties
* Apply suggestions from code review
Co-authored-by: Lari Hotari <lh...@users.noreply.github.com>
Co-authored-by: Lari Hotari <lh...@users.noreply.github.com>
(cherry picked from commit 8e77e88cc54c05fa4f8b360be499c3da61607a66)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 5 +-
.../broker/admin/impl/PersistentTopicsBase.java | 104 +++++++++++++++++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 37 ++++++++
.../broker/admin/AdminApiSubscriptionTest.java | 45 +++++++++
.../org/apache/pulsar/client/admin/Topics.java | 21 +++++
.../pulsar/client/admin/internal/TopicsImpl.java | 19 ++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 16 ++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 36 +++++++
.../pulsar/tests/integration/cli/CLITest.java | 25 ++++-
9 files changed, 305 insertions(+), 3 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 1092ca0a89d..424874a8dfa 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -47,6 +47,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -324,7 +325,7 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
- ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
+ ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<ManagedCursorInfo>() {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {
ManagedCursorInfo copy = ManagedCursorInfo
@@ -333,7 +334,7 @@ public class ManagedCursorImpl implements ManagedCursor {
.addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
.build();
ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
- name, copy, stat, new MetaStoreCallback<>() {
+ name, copy, stat, new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat stat) {
log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4228ac6254d..f118172045d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1556,6 +1556,32 @@ public class PersistentTopicsBase extends AdminResource {
});
}
+ private void internalUpdateSubscriptionPropertiesForNonPartitionedTopic(AsyncResponse asyncResponse,
+ String subName, Map<String, String> subscriptionProperties,
+ boolean authoritative) {
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME))
+ .thenCompose(__ -> {
+ Topic topic = getTopicReference(topicName);
+ Subscription sub = topic.getSubscription(subName);
+ if (sub == null) {
+ throw new RestException(Status.NOT_FOUND, "Subscription not found");
+ }
+ return sub.updateSubscriptionProperties(subscriptionProperties);
+ }).thenRun(() -> {
+ log.info("[{}][{}] Updated subscription {}", clientAppId(), topicName, subName);
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ // If the exception is not redirect exception we need to log it.
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to update subscription {} {}", clientAppId(), topicName, subName, cause);
+ }
+ asyncResponse.resume(new RestException(cause));
+ return null;
+ });
+ }
+
protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse,
String subName, boolean authoritative) {
CompletableFuture<Void> future;
@@ -2276,6 +2302,84 @@ public class PersistentTopicsBase extends AdminResource {
});
}
+ protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse, String subName,
+ Map<String, String> subscriptionProperties,
+ boolean authoritative) {
+ CompletableFuture<Void> future;
+ if (topicName.isGlobal()) {
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ future = CompletableFuture.completedFuture(null);
+ }
+
+ future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
+ if (topicName.isPartitioned()) {
+ internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
+ subscriptionProperties, authoritative);
+ } else {
+ getPartitionedTopicMetadataAsync(topicName,
+ authoritative, false).thenAcceptAsync(partitionMetadata -> {
+ if (partitionMetadata.partitions > 0) {
+ final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ TopicName topicNamePartition = topicName.getPartition(i);
+ try {
+ futures.add(pulsar().getAdminClient().topics()
+ .updateSubscriptionPropertiesAsync(topicNamePartition.toString(),
+ subName, subscriptionProperties));
+ } catch (Exception e) {
+ log.error("[{}] Failed to update properties for subscription {} {}",
+ clientAppId(), topicNamePartition, subName,
+ e);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ }
+
+ FutureUtil.waitForAll(futures).handle((result, exception) -> {
+ if (exception != null) {
+ Throwable t = exception.getCause();
+ if (t instanceof NotFoundException) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+ return null;
+ } else if (t instanceof PreconditionFailedException) {
+ asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+ "Subscription has active connected consumers"));
+ return null;
+ } else {
+ log.error("[{}] Failed to update properties for subscription {} {}",
+ clientAppId(), topicName, subName, t);
+ asyncResponse.resume(new RestException(t));
+ return null;
+ }
+ }
+
+ asyncResponse.resume(Response.noContent().build());
+ return null;
+ });
+ } else {
+ internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
+ subscriptionProperties, authoritative);
+ }
+ }, pulsar().getExecutor()).exceptionally(ex -> {
+ log.error("[{}] Failed to update properties for subscription {} from topic {}",
+ clientAppId(), subName, topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+ }).exceptionally(ex -> {
+ // If the exception is not redirect exception we need to log it.
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to update subscription {} from topic {}",
+ clientAppId(), subName, topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative,
MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
CompletableFuture<Void> ret;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index fd5cb75b10b..754f54ca515 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1464,6 +1464,43 @@ public class PersistentTopics extends PersistentTopicsBase {
}
}
+ @PUT
+ @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/properties")
+ @ApiOperation(value = "Replaces all the properties on the given subscription")
+ @ApiResponses(value = {
+ @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+ @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+ + "subscriber is not authorized to access this operation"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
+ @ApiResponse(code = 405, message = "Method Not Allowed"),
+ @ApiResponse(code = 500, message = "Internal server error"),
+ @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
+ })
+ public void updateSubscriptionProperties(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Subscription to update", required = true)
+ @PathParam("subName") String encodedSubName,
+ @ApiParam(value = "The new properties") Map<String, String> subscriptionProperties,
+ @ApiParam(value = "Is authentication required to perform this operation")
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ try {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalUpdateSubscriptionProperties(asyncResponse, decode(encodedSubName),
+ subscriptionProperties, authoritative);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+
@POST
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/resetcursor")
@ApiOperation(value = "Reset subscription to message position closest to given position.",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
index 8d25c61e1dd..f7af28ddf41 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
@@ -174,5 +174,50 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty());
}
+ // clear the properties on subscriptionName
+ admin.topics().updateSubscriptionProperties(topic, subscriptionName, new HashMap<>());
+
+ if (partitioned) {
+ PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topic);
+ for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
+ SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
+ .getSubscriptions().get(subscriptionName);
+ assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
+ }
+
+ // aggregated properties
+ SubscriptionStats subscriptionStats = admin.topics().getPartitionedStats(topic, false)
+ .getSubscriptions().get(subscriptionName);
+ assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
+
+ } else {
+ SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
+ assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
+ }
+
+ // update the properties on subscriptionName
+ admin.topics().updateSubscriptionProperties(topic, subscriptionName, properties);
+
+ if (partitioned) {
+ PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topic);
+ for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
+ SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
+ .getSubscriptions().get(subscriptionName);
+ assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+ }
+
+ // aggregated properties
+ SubscriptionStats subscriptionStats = admin.topics().getPartitionedStats(topic, false)
+ .getSubscriptions().get(subscriptionName);
+ assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+
+ } else {
+ SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
+ assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+
+ SubscriptionStats subscriptionStats2 = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName2);
+ assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty());
+ }
+
}
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 7b533030961..eab3b8041f3 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1737,6 +1737,17 @@ public interface Topics {
*/
void resetCursor(String topic, String subName, MessageId messageId, boolean isExcluded) throws PulsarAdminException;
+ /**
+ * Update Subscription Properties on a topic subscription.
+ * The new properties will override the existing values, properties that are not passed will be removed.
+ * @param topic
+ * @param subName
+ * @param subscriptionProperties
+ * @throws PulsarAdminException
+ */
+ void updateSubscriptionProperties(String topic, String subName, Map<String, String> subscriptionProperties)
+ throws PulsarAdminException;
+
/**
* Reset cursor position on a topic subscription.
*
@@ -1761,6 +1772,16 @@ public interface Topics {
*/
CompletableFuture<Void> resetCursorAsync(String topic, String subName, MessageId messageId, boolean isExcluded);
+ /**
+ * Update Subscription Properties on a topic subscription.
+ * The new properties will override the existing values, properties that are not passed will be removed.
+ * @param topic
+ * @param subName
+ * @param subscriptionProperties
+ */
+ CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String subName,
+ Map<String, String> subscriptionProperties);
+
/**
* Reset cursor position on a topic subscription.
*
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 1667cc4f90a..a5f5edae290 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1147,6 +1147,25 @@ public class TopicsImpl extends BaseResource implements Topics {
sync(() -> resetCursorAsync(topic, subName, messageId));
}
+ @Override
+ public void updateSubscriptionProperties(String topic, String subName, Map<String, String> subscriptionProperties)
+ throws PulsarAdminException {
+ sync(() -> updateSubscriptionPropertiesAsync(topic, subName, subscriptionProperties));
+ }
+
+ @Override
+ public CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String subName,
+ Map<String, String> subscriptionProperties) {
+ TopicName tn = validateTopic(topic);
+ String encodedSubName = Codec.encode(subName);
+ WebTarget path = topicPath(tn, "subscription", encodedSubName,
+ "properties");
+ if (subscriptionProperties == null) {
+ subscriptionProperties = new HashMap<>();
+ }
+ return asyncPutRequest(path, Entity.entity(subscriptionProperties, MediaType.APPLICATION_JSON));
+ }
+
@Override
public void resetCursor(String topic, String subName, MessageId messageId
, boolean isExcluded) throws PulsarAdminException {
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 23ad1a04f84..ca129e54971 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1418,6 +1418,22 @@ public class PulsarAdminToolTest {
props.put("a", "b");
verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, false, props);
+ cmdTopics = new CmdTopics(() -> admin);
+ cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest -r"));
+ verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, true, null);
+
+ cmdTopics = new CmdTopics(() -> admin);
+ cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 --clear"));
+ verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", new HashMap<>());
+
+ cmdTopics = new CmdTopics(() -> admin);
+ props = new HashMap<>();
+ props.put("a", "b");
+ props.put("c", "d");
+ cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 -p a=b -p c=d"));
+ verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", props);
+
+ cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32"));
verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32, null);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 883617f89b9..7c3736616e9 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -98,6 +98,7 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("subscriptions", new ListSubscriptions());
jcommander.addCommand("unsubscribe", new DeleteSubscription());
jcommander.addCommand("create-subscription", new CreateSubscription());
+ jcommander.addCommand("update-subscription-properties", new UpdateSubscriptionProperties());
jcommander.addCommand("stats", new GetStats());
jcommander.addCommand("stats-internal", new GetInternalStats());
@@ -919,6 +920,41 @@ public class CmdTopics extends CmdBase {
}
}
+ @Parameters(commandDescription = "Update the properties of a subscription on a topic")
+ private class UpdateSubscriptionProperties extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "-s",
+ "--subscription" }, description = "Subscription to update", required = true)
+ private String subscriptionName;
+
+ @Parameter(names = {"--property", "-p"}, description = "key value pair properties(-p a=b -p c=d)",
+ required = false)
+ private java.util.List<String> properties;
+
+ @Parameter(names = {"--clear", "-c"}, description = "Remove all properties",
+ required = false)
+ private boolean clear;
+
+ @Override
+ void run() throws Exception {
+ String topic = validateTopicName(params);
+ Map<String, String> map = parseListKeyValueMap(properties);
+ if (map == null) {
+ map = Collections.emptyMap();
+ }
+ if ((map.isEmpty()) && !clear) {
+ throw new ParameterException("If you want to clear the properties you have to use --clear");
+ }
+ if (clear && !map.isEmpty()) {
+ throw new ParameterException("If you set --clear then you should not pass any properties");
+ }
+ getTopics().updateSubscriptionProperties(topic, subscriptionName, map);
+ }
+ }
+
+
@Parameters(commandDescription = "Reset position for subscription to a position that is closest to "
+ "timestamp or messageId.")
private class ResetCursor extends CliCommand {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index b0da409e0fe..a1e417ca547 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -174,7 +174,7 @@ public class CLITest extends PulsarTestSuite {
}
@Test
- public void testCreateSubscriptionWithPropertiesCommand() throws Exception {
+ public void testCreateUpdateSubscriptionWithPropertiesCommand() throws Exception {
String topic = "testCreateSubscriptionCommmand";
String subscriptionPrefix = "subscription-";
@@ -194,6 +194,29 @@ public class CLITest extends PulsarTestSuite {
"" + subscriptionPrefix + i
);
result.assertNoOutput();
+
+ ContainerExecResult resultUpdate = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "topics",
+ "update-subscription-properties",
+ "-p",
+ "a=e",
+ "persistent://public/default/" + topic,
+ "--subscription",
+ "" + subscriptionPrefix + i
+ );
+ resultUpdate.assertNoOutput();
+
+ ContainerExecResult resultClear = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "topics",
+ "update-subscription-properties",
+ "-c",
+ "persistent://public/default/" + topic,
+ "--subscription",
+ "" + subscriptionPrefix + i
+ );
+ resultClear.assertNoOutput();
i++;
}
}