You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/26 04:05:02 UTC
[pulsar] branch master updated: Support get topic applied policy
for DelayedDeliveryPolicies (#9245)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b9fe5d1 Support get topic applied policy for DelayedDeliveryPolicies (#9245)
b9fe5d1 is described below
commit b9fe5d18fd7a3cb9c58bcdb0a9996a562481966a
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Tue Jan 26 12:04:28 2021 +0800
Support get topic applied policy for DelayedDeliveryPolicies (#9245)
Master Issue: #9216
### Modifications
add applied API for client
### Verifying this change
unit test:
adminApiDelayedDelivery.testNamespaceDelayedDeliveryPolicyApi
adminApiDelayedDelivery.testDelayedDeliveryApplied
---
.../broker/admin/impl/PersistentTopicsBase.java | 19 +++++++
.../apache/pulsar/broker/admin/v2/Namespaces.java | 11 ++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 21 ++++---
.../broker/admin/AdminApiDelayedDelivery.java | 65 ++++++++++++++++++++++
.../org/apache/pulsar/client/admin/Namespaces.java | 13 +++++
.../org/apache/pulsar/client/admin/Topics.java | 18 ++++++
.../client/admin/internal/NamespacesImpl.java | 22 ++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 19 ++++++-
.../pulsar/admin/cli/PulsarAdminToolTest.java | 14 +++++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 13 +++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 5 +-
11 files changed, 208 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 8e1dfdd..82c312b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -827,6 +827,25 @@ public class PersistentTopicsBase extends AdminResource {
}
}
+ protected CompletableFuture<DelayedDeliveryPolicies> internalGetDelayedDeliveryPolicies(boolean applied) {
+ TopicPolicies policies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
+ DelayedDeliveryPolicies delayedDeliveryPolicies = null;
+ if (policies.isDelayedDeliveryEnabledSet() && policies.isDelayedDeliveryTickTimeMillisSet()) {
+ delayedDeliveryPolicies = new DelayedDeliveryPolicies(
+ policies.getDelayedDeliveryTickTimeMillis(),
+ policies.getDelayedDeliveryEnabled());
+ }
+ if (delayedDeliveryPolicies == null && applied) {
+ delayedDeliveryPolicies = getNamespacePolicies(namespaceName).delayed_delivery_policies;
+ if (delayedDeliveryPolicies == null) {
+ delayedDeliveryPolicies = new DelayedDeliveryPolicies(
+ pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis(),
+ pulsar().getConfiguration().isDelayedDeliveryEnabled());
+ }
+ }
+ return CompletableFuture.completedFuture(delayedDeliveryPolicies);
+ }
+
protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPolicies offloadPolicies) {
TopicPolicies topicPolicies = null;
try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index e2ec675..e1f7093 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -905,6 +905,17 @@ public class Namespaces extends NamespacesBase {
internalSetDelayedDelivery(deliveryPolicies);
}
+ @DELETE
+ @Path("/{tenant}/{namespace}/delayedDelivery")
+ @ApiOperation(value = "Delete delayed delivery messages config on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), })
+ public void removeDelayedDeliveryPolicies(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(tenant, namespace);
+ internalSetDelayedDelivery(null);
+ }
+
@GET
@Path("/{tenant}/{namespace}/inactiveTopicPolicies")
@ApiOperation(value = "Get inactive topic policies config on a namespace.")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 6a16b98..ab3e8a5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -618,15 +618,20 @@ public class PersistentTopics extends PersistentTopicsBase {
public void getDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String encodedTopic) {
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("applied") boolean applied) {
validateTopicName(tenant, namespace, encodedTopic);
- TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
- if (topicPolicies.isDelayedDeliveryEnabledSet() && topicPolicies.isDelayedDeliveryTickTimeMillisSet()) {
- asyncResponse.resume(new DelayedDeliveryPolicies(topicPolicies.getDelayedDeliveryTickTimeMillis()
- , topicPolicies.getDelayedDeliveryEnabled()));
- } else {
- asyncResponse.resume(Response.noContent().build());
- }
+ internalGetDelayedDeliveryPolicies(applied).whenComplete((res, ex) -> {
+ if (ex instanceof RestException) {
+ log.error("Failed get DelayedDeliveryPolicies", ex);
+ asyncResponse.resume(ex);
+ } else if (ex != null) {
+ log.error("Failed get DelayedDeliveryPolicies", ex);
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ asyncResponse.resume(res);
+ }
+ });
}
@POST
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java
index 045d973..84482ca 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java
@@ -23,6 +23,7 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -35,6 +36,7 @@ import org.testng.annotations.Test;
import java.util.Set;
import java.util.TreeSet;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.testng.Assert.*;
@@ -115,4 +117,67 @@ public class AdminApiDelayedDelivery extends MockedPulsarServiceBaseTest {
assertTrue(delayedMessages.contains("delayed-msg-" + i));
}
}
+
+ @Test(timeOut = 30000)
+ public void testNamespaceDelayedDeliveryPolicyApi() throws Exception {
+ final String namespace = "delayed-delivery-messages/my-ns";
+ admin.namespaces().createNamespace(namespace);
+ assertNull(admin.namespaces().getDelayedDelivery(namespace));
+ DelayedDeliveryPolicies delayedDeliveryPolicies = new DelayedDeliveryPolicies(3, true);
+ admin.namespaces().setDelayedDeliveryMessages(namespace, delayedDeliveryPolicies);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
+ -> assertEquals(admin.namespaces().getDelayedDelivery(namespace), delayedDeliveryPolicies));
+
+ admin.namespaces().removeDelayedDeliveryMessages(namespace);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
+ -> assertNull(admin.namespaces().getDelayedDelivery(namespace)));
+ }
+
+ @Test(timeOut = 30000)
+ public void testDelayedDeliveryApplied() throws Exception {
+ cleanup();
+ conf.setSystemTopicEnabled(true);
+ conf.setTopicLevelPoliciesEnabled(true);
+ setup();
+ final String namespace = "delayed-delivery-messages/my-ns";
+ final String topic = "persistent://" + namespace + "/test" + UUID.randomUUID();
+ admin.namespaces().createNamespace(namespace);
+ pulsarClient.newProducer().topic(topic).create().close();
+ Awaitility.await().atMost(3, TimeUnit.SECONDS)
+ .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+ //namespace-level default value is null
+ assertNull(admin.namespaces().getDelayedDelivery(namespace));
+ //topic-level default value is null
+ assertNull(admin.topics().getDelayedDeliveryPolicy(topic));
+ //use broker-level by default
+ DelayedDeliveryPolicies brokerLevelPolicy =
+ new DelayedDeliveryPolicies(conf.getDelayedDeliveryTickTimeMillis(),
+ conf.isDelayedDeliveryEnabled());
+ assertEquals(admin.topics().getDelayedDeliveryPolicy(topic, true), brokerLevelPolicy);
+ //set namespace-level policy
+ DelayedDeliveryPolicies namespaceLevelPolicy =
+ new DelayedDeliveryPolicies(100, true);
+ admin.namespaces().setDelayedDeliveryMessages(namespace, namespaceLevelPolicy);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
+ -> assertNotNull(admin.namespaces().getDelayedDelivery(namespace)));
+ DelayedDeliveryPolicies policyFromBroker = admin.topics().getDelayedDeliveryPolicy(topic, true);
+ assertEquals(policyFromBroker.getTickTime(), 100);
+ assertTrue(policyFromBroker.isActive());
+ // set topic-level policy
+ DelayedDeliveryPolicies topicLevelPolicy = new DelayedDeliveryPolicies(200, true);
+ admin.topics().setDelayedDeliveryPolicy(topic, topicLevelPolicy);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
+ -> assertNotNull(admin.topics().getDelayedDeliveryPolicy(topic)));
+ policyFromBroker = admin.topics().getDelayedDeliveryPolicy(topic, true);
+ assertEquals(policyFromBroker.getTickTime(), 200);
+ assertTrue(policyFromBroker.isActive());
+ //remove topic-level policy
+ admin.topics().removeDelayedDeliveryPolicy(topic); ;
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
+ -> assertEquals(admin.topics().getDelayedDeliveryPolicy(topic, true), namespaceLevelPolicy));
+ //remove namespace-level policy
+ admin.namespaces().removeDelayedDeliveryMessages(namespace);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
+ -> assertEquals(admin.topics().getDelayedDeliveryPolicy(topic, true), brokerLevelPolicy));
+ }
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index b9a863f..2af5751 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -2339,6 +2339,19 @@ public interface Namespaces {
String namespace, DelayedDeliveryPolicies delayedDeliveryPolicies);
/**
+ * Remove the delayed delivery messages for all topics within a namespace.
+ * @param namespace
+ * @throws PulsarAdminException
+ */
+ void removeDelayedDeliveryMessages(String namespace) throws PulsarAdminException;
+ /**
+ * Remove the delayed delivery messages for all topics within a namespace asynchronously.
+ * @param namespace
+ * @return
+ */
+ CompletableFuture<Void> removeDelayedDeliveryMessagesAsync(String namespace);
+
+ /**
* Get the inactive deletion strategy for all topics within a namespace synchronously.
* @param namespace
* @return
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 2e9c38d..9f17f50 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1530,6 +1530,24 @@ public interface Topics {
void removeBacklogQuota(String topic) throws PulsarAdminException;
/**
+ * Get the delayed delivery policy applied for a specified topic.
+ * @param topic
+ * @param applied
+ * @return
+ * @throws PulsarAdminException
+ */
+ DelayedDeliveryPolicies getDelayedDeliveryPolicy(String topic
+ , boolean applied) throws PulsarAdminException;
+
+ /**
+ * Get the delayed delivery policy applied for a specified topic asynchronously.
+ * @param topic
+ * @param applied
+ * @return
+ */
+ CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryPolicyAsync(String topic
+ , boolean applied);
+ /**
* Get the delayed delivery policy for a specified topic.
* @param topic
* @return
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index fd015bd..f0e2e96 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -1913,6 +1913,28 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
}
@Override
+ public void removeDelayedDeliveryMessages(String namespace) throws PulsarAdminException {
+ try {
+ removeDelayedDeliveryMessagesAsync(namespace)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeDelayedDeliveryMessagesAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "delayedDelivery");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public InactiveTopicPolicies getInactiveTopicPolicies(String namespace) throws PulsarAdminException {
try {
return getInactiveTopicPoliciesAsync(namespace).
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 6240773..ab60d8a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1687,9 +1687,10 @@ public class TopicsImpl extends BaseResource implements Topics {
}
@Override
- public DelayedDeliveryPolicies getDelayedDeliveryPolicy(String topic) throws PulsarAdminException {
+ public DelayedDeliveryPolicies getDelayedDeliveryPolicy(String topic
+ , boolean applied) throws PulsarAdminException {
try {
- return getDelayedDeliveryPolicyAsync(topic).
+ return getDelayedDeliveryPolicyAsync(topic, applied).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
@@ -1702,9 +1703,11 @@ public class TopicsImpl extends BaseResource implements Topics {
}
@Override
- public CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryPolicyAsync(String topic) {
+ public CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryPolicyAsync(String topic
+ , boolean applied) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "delayedDelivery");
+ path = path.queryParam("applied", applied);
final CompletableFuture<DelayedDeliveryPolicies> future = new CompletableFuture<>();
asyncGetRequest(path, new InvocationCallback<DelayedDeliveryPolicies>() {
@Override
@@ -1721,6 +1724,16 @@ public class TopicsImpl extends BaseResource implements Topics {
}
@Override
+ public DelayedDeliveryPolicies getDelayedDeliveryPolicy(String topic) throws PulsarAdminException {
+ return getDelayedDeliveryPolicy(topic, false);
+ }
+
+ @Override
+ public CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryPolicyAsync(String topic) {
+ return getDelayedDeliveryPolicyAsync(topic, false);
+ }
+
+ @Override
public CompletableFuture<Void> removeDelayedDeliveryPolicyAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "delayedDelivery");
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 93aea66..8dc3f9d 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -425,6 +425,9 @@ public class PulsarAdminToolTest {
namespaces.run(split("get-delayed-delivery myprop/clust/ns1"));
verify(mockNamespaces).getDelayedDelivery("myprop/clust/ns1");
+ namespaces.run(split("remove-delayed-delivery myprop/clust/ns1"));
+ verify(mockNamespaces).removeDelayedDeliveryMessages("myprop/clust/ns1");
+
namespaces.run(split("set-inactive-topic-policies myprop/clust/ns1 -e -t 1s -m delete_when_no_subscriptions"));
verify(mockNamespaces).setInactiveTopicPolicies("myprop/clust/ns1"
, new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1,true));
@@ -792,6 +795,14 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("remove-offload-policies persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeOffloadPolicies("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("get-delayed-delivery persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopics).getDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1", false);
+ cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable"));
+ verify(mockTopics).setDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1",
+ new DelayedDeliveryPolicies(10000, true));
+ cmdTopics.run(split("remove-delayed-delivery persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopics).removeDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1") ;
+
cmdTopics.run(split(
"set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -m 8 -rb 9 -t 10 -orp tiered-storage-first"));
OffloadPolicies offloadPolicies = OffloadPolicies.create("s3", "region", "bucket"
@@ -884,6 +895,9 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1", true);
+
+ cmdTopics.run(split("get-delayed-delivery persistent://myprop/clust/ns1/ds1 --applied"));
+ verify(mockTopics).getDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1", true);
}
@Test
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 6b39193..1fafbc0 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1097,6 +1097,18 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Remove delayed delivery policies from a namespace")
+ private class RemoveDelayedDelivery extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().removeDelayedDeliveryMessages(namespace);
+ }
+ }
+
@Parameters(commandDescription = "Get the inactive topic policy for a namespace")
private class GetInactiveTopicPolicies extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
@@ -1996,6 +2008,7 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("set-delayed-delivery", new SetDelayedDelivery());
jcommander.addCommand("get-delayed-delivery", new GetDelayedDelivery());
+ jcommander.addCommand("remove-delayed-delivery", new RemoveDelayedDelivery());
jcommander.addCommand("get-inactive-topic-policies", new GetInactiveTopicPolicies());
jcommander.addCommand("set-inactive-topic-policies", new SetInactiveTopicPolicies());
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index b4783d1..11ba768 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -1029,10 +1029,13 @@ public class CmdTopics extends CmdBase {
@Parameter(description = "tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
+ @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
+ private boolean applied = false;
+
@Override
void run() throws PulsarAdminException {
String topicName = validateTopicName(params);
- print(admin.topics().getDelayedDeliveryPolicy(topicName));
+ print(admin.topics().getDelayedDeliveryPolicy(topicName, applied));
}
}