You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/26 04:05:02 UTC

[pulsar] branch master updated: Support get topic applied policy for DelayedDeliveryPolicies (#9245)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b9fe5d1  Support get topic applied policy for DelayedDeliveryPolicies (#9245)
b9fe5d1 is described below

commit b9fe5d18fd7a3cb9c58bcdb0a9996a562481966a
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Tue Jan 26 12:04:28 2021 +0800

    Support get topic applied policy for DelayedDeliveryPolicies (#9245)
    
    Master Issue: #9216
    
    ### Modifications
    add applied API for client
    
    ### Verifying this change
    unit test:
    adminApiDelayedDelivery.testNamespaceDelayedDeliveryPolicyApi
    adminApiDelayedDelivery.testDelayedDeliveryApplied
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 19 +++++++
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 11 ++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 21 ++++---
 .../broker/admin/AdminApiDelayedDelivery.java      | 65 ++++++++++++++++++++++
 .../org/apache/pulsar/client/admin/Namespaces.java | 13 +++++
 .../org/apache/pulsar/client/admin/Topics.java     | 18 ++++++
 .../client/admin/internal/NamespacesImpl.java      | 22 ++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 19 ++++++-
 .../pulsar/admin/cli/PulsarAdminToolTest.java      | 14 +++++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 13 +++++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  5 +-
 11 files changed, 208 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 8e1dfdd..82c312b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -827,6 +827,25 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
+    protected CompletableFuture<DelayedDeliveryPolicies> internalGetDelayedDeliveryPolicies(boolean applied) {
+        TopicPolicies policies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
+        DelayedDeliveryPolicies delayedDeliveryPolicies = null;
+        if (policies.isDelayedDeliveryEnabledSet() && policies.isDelayedDeliveryTickTimeMillisSet()) {
+            delayedDeliveryPolicies = new DelayedDeliveryPolicies(
+                    policies.getDelayedDeliveryTickTimeMillis(),
+                    policies.getDelayedDeliveryEnabled());
+        }
+        if (delayedDeliveryPolicies == null && applied) {
+            delayedDeliveryPolicies = getNamespacePolicies(namespaceName).delayed_delivery_policies;
+            if (delayedDeliveryPolicies == null) {
+                delayedDeliveryPolicies = new DelayedDeliveryPolicies(
+                        pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis(),
+                        pulsar().getConfiguration().isDelayedDeliveryEnabled());
+            }
+        }
+        return CompletableFuture.completedFuture(delayedDeliveryPolicies);
+    }
+
     protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPolicies offloadPolicies) {
         TopicPolicies topicPolicies = null;
         try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index e2ec675..e1f7093 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -905,6 +905,17 @@ public class Namespaces extends NamespacesBase {
         internalSetDelayedDelivery(deliveryPolicies);
     }
 
+    @DELETE
+    @Path("/{tenant}/{namespace}/delayedDelivery")
+    @ApiOperation(value = "Delete delayed delivery messages config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), })
+    public void removeDelayedDeliveryPolicies(@PathParam("tenant") String tenant,
+                                           @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        internalSetDelayedDelivery(null);
+    }
+
     @GET
     @Path("/{tenant}/{namespace}/inactiveTopicPolicies")
     @ApiOperation(value = "Get inactive topic policies config on a namespace.")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 6a16b98..ab3e8a5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -618,15 +618,20 @@ public class PersistentTopics extends PersistentTopicsBase {
     public void getDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncResponse,
                                            @PathParam("tenant") String tenant,
                                            @PathParam("namespace") String namespace,
-                                           @PathParam("topic") @Encoded String encodedTopic) {
+                                           @PathParam("topic") @Encoded String encodedTopic,
+                                           @QueryParam("applied") boolean applied) {
         validateTopicName(tenant, namespace, encodedTopic);
-        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
-        if (topicPolicies.isDelayedDeliveryEnabledSet() && topicPolicies.isDelayedDeliveryTickTimeMillisSet()) {
-            asyncResponse.resume(new DelayedDeliveryPolicies(topicPolicies.getDelayedDeliveryTickTimeMillis()
-                    , topicPolicies.getDelayedDeliveryEnabled()));
-        } else {
-            asyncResponse.resume(Response.noContent().build());
-        }
+        internalGetDelayedDeliveryPolicies(applied).whenComplete((res, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed get DelayedDeliveryPolicies", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed get DelayedDeliveryPolicies", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                asyncResponse.resume(res);
+            }
+        });
     }
 
     @POST
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java
index 045d973..84482ca 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java
@@ -23,6 +23,7 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -35,6 +36,7 @@ import org.testng.annotations.Test;
 
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static org.testng.Assert.*;
@@ -115,4 +117,67 @@ public class AdminApiDelayedDelivery extends MockedPulsarServiceBaseTest {
             assertTrue(delayedMessages.contains("delayed-msg-" + i));
         }
     }
+
+    @Test(timeOut = 30000)
+    public void testNamespaceDelayedDeliveryPolicyApi() throws Exception {
+        final String namespace = "delayed-delivery-messages/my-ns";
+        admin.namespaces().createNamespace(namespace);
+        assertNull(admin.namespaces().getDelayedDelivery(namespace));
+        DelayedDeliveryPolicies delayedDeliveryPolicies = new DelayedDeliveryPolicies(3, true);
+        admin.namespaces().setDelayedDeliveryMessages(namespace, delayedDeliveryPolicies);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
+                -> assertEquals(admin.namespaces().getDelayedDelivery(namespace), delayedDeliveryPolicies));
+
+        admin.namespaces().removeDelayedDeliveryMessages(namespace);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
+                -> assertNull(admin.namespaces().getDelayedDelivery(namespace)));
+    }
+
+    @Test(timeOut = 30000)
+    public void testDelayedDeliveryApplied() throws Exception {
+        cleanup();
+        conf.setSystemTopicEnabled(true);
+        conf.setTopicLevelPoliciesEnabled(true);
+        setup();
+        final String namespace = "delayed-delivery-messages/my-ns";
+        final String topic = "persistent://" + namespace + "/test" + UUID.randomUUID();
+        admin.namespaces().createNamespace(namespace);
+        pulsarClient.newProducer().topic(topic).create().close();
+        Awaitility.await().atMost(3, TimeUnit.SECONDS)
+                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+        //namespace-level default value is null
+        assertNull(admin.namespaces().getDelayedDelivery(namespace));
+        //topic-level default value is null
+        assertNull(admin.topics().getDelayedDeliveryPolicy(topic));
+        //use broker-level by default
+        DelayedDeliveryPolicies brokerLevelPolicy =
+                new DelayedDeliveryPolicies(conf.getDelayedDeliveryTickTimeMillis(),
+                        conf.isDelayedDeliveryEnabled());
+        assertEquals(admin.topics().getDelayedDeliveryPolicy(topic, true), brokerLevelPolicy);
+        //set namespace-level policy
+        DelayedDeliveryPolicies namespaceLevelPolicy =
+                new DelayedDeliveryPolicies(100, true);
+        admin.namespaces().setDelayedDeliveryMessages(namespace, namespaceLevelPolicy);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
+                -> assertNotNull(admin.namespaces().getDelayedDelivery(namespace)));
+        DelayedDeliveryPolicies policyFromBroker = admin.topics().getDelayedDeliveryPolicy(topic, true);
+        assertEquals(policyFromBroker.getTickTime(), 100);
+        assertTrue(policyFromBroker.isActive());
+        // set topic-level policy
+        DelayedDeliveryPolicies topicLevelPolicy = new DelayedDeliveryPolicies(200, true);
+        admin.topics().setDelayedDeliveryPolicy(topic, topicLevelPolicy);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
+                -> assertNotNull(admin.topics().getDelayedDeliveryPolicy(topic)));
+        policyFromBroker = admin.topics().getDelayedDeliveryPolicy(topic, true);
+        assertEquals(policyFromBroker.getTickTime(), 200);
+        assertTrue(policyFromBroker.isActive());
+        //remove topic-level policy
+        admin.topics().removeDelayedDeliveryPolicy(topic); ;
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
+                -> assertEquals(admin.topics().getDelayedDeliveryPolicy(topic, true), namespaceLevelPolicy));
+        //remove namespace-level policy
+        admin.namespaces().removeDelayedDeliveryMessages(namespace);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
+                -> assertEquals(admin.topics().getDelayedDeliveryPolicy(topic, true), brokerLevelPolicy));
+    }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index b9a863f..2af5751 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -2339,6 +2339,19 @@ public interface Namespaces {
             String namespace, DelayedDeliveryPolicies delayedDeliveryPolicies);
 
     /**
+     * Remove the delayed delivery messages for all topics within a namespace.
+     * @param namespace
+     * @throws PulsarAdminException
+     */
+    void removeDelayedDeliveryMessages(String namespace) throws PulsarAdminException;
+    /**
+     * Remove the delayed delivery messages for all topics within a namespace asynchronously.
+     * @param namespace
+     * @return
+     */
+    CompletableFuture<Void> removeDelayedDeliveryMessagesAsync(String namespace);
+
+    /**
      * Get the inactive deletion strategy for all topics within a namespace synchronously.
      * @param namespace
      * @return
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 2e9c38d..9f17f50 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1530,6 +1530,24 @@ public interface Topics {
     void removeBacklogQuota(String topic) throws PulsarAdminException;
 
     /**
+     * Get the delayed delivery policy applied for a specified topic.
+     * @param topic
+     * @param applied
+     * @return
+     * @throws PulsarAdminException
+     */
+    DelayedDeliveryPolicies getDelayedDeliveryPolicy(String topic
+            , boolean applied) throws PulsarAdminException;
+
+    /**
+     * Get the delayed delivery policy applied for a specified topic asynchronously.
+     * @param topic
+     * @param applied
+     * @return
+     */
+    CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryPolicyAsync(String topic
+            , boolean applied);
+    /**
      * Get the delayed delivery policy for a specified topic.
      * @param topic
      * @return
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index fd015bd..f0e2e96 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -1913,6 +1913,28 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     }
 
     @Override
+    public void removeDelayedDeliveryMessages(String namespace) throws PulsarAdminException {
+        try {
+            removeDelayedDeliveryMessagesAsync(namespace)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeDelayedDeliveryMessagesAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "delayedDelivery");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public InactiveTopicPolicies getInactiveTopicPolicies(String namespace) throws PulsarAdminException {
         try {
             return getInactiveTopicPoliciesAsync(namespace).
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 6240773..ab60d8a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1687,9 +1687,10 @@ public class TopicsImpl extends BaseResource implements Topics {
     }
 
     @Override
-    public DelayedDeliveryPolicies getDelayedDeliveryPolicy(String topic) throws PulsarAdminException {
+    public DelayedDeliveryPolicies getDelayedDeliveryPolicy(String topic
+            , boolean applied) throws PulsarAdminException {
         try {
-            return getDelayedDeliveryPolicyAsync(topic).
+            return getDelayedDeliveryPolicyAsync(topic, applied).
                     get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
@@ -1702,9 +1703,11 @@ public class TopicsImpl extends BaseResource implements Topics {
     }
 
     @Override
-    public CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryPolicyAsync(String topic) {
+    public CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryPolicyAsync(String topic
+            , boolean applied) {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "delayedDelivery");
+        path = path.queryParam("applied", applied);
         final CompletableFuture<DelayedDeliveryPolicies> future = new CompletableFuture<>();
         asyncGetRequest(path, new InvocationCallback<DelayedDeliveryPolicies>() {
             @Override
@@ -1721,6 +1724,16 @@ public class TopicsImpl extends BaseResource implements Topics {
     }
 
     @Override
+    public DelayedDeliveryPolicies getDelayedDeliveryPolicy(String topic) throws PulsarAdminException {
+        return getDelayedDeliveryPolicy(topic, false);
+    }
+
+    @Override
+    public CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryPolicyAsync(String topic) {
+        return getDelayedDeliveryPolicyAsync(topic, false);
+    }
+
+    @Override
     public CompletableFuture<Void> removeDelayedDeliveryPolicyAsync(String topic) {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "delayedDelivery");
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 93aea66..8dc3f9d 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -425,6 +425,9 @@ public class PulsarAdminToolTest {
         namespaces.run(split("get-delayed-delivery myprop/clust/ns1"));
         verify(mockNamespaces).getDelayedDelivery("myprop/clust/ns1");
 
+        namespaces.run(split("remove-delayed-delivery myprop/clust/ns1"));
+        verify(mockNamespaces).removeDelayedDeliveryMessages("myprop/clust/ns1");
+
         namespaces.run(split("set-inactive-topic-policies myprop/clust/ns1 -e -t 1s -m delete_when_no_subscriptions"));
         verify(mockNamespaces).setInactiveTopicPolicies("myprop/clust/ns1"
                 , new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1,true));
@@ -792,6 +795,14 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("remove-offload-policies persistent://myprop/clust/ns1/ds1"));
         verify(mockTopics).removeOffloadPolicies("persistent://myprop/clust/ns1/ds1");
 
+        cmdTopics.run(split("get-delayed-delivery persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopics).getDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1", false);
+        cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable"));
+        verify(mockTopics).setDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1",
+                new DelayedDeliveryPolicies(10000, true));
+        cmdTopics.run(split("remove-delayed-delivery persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopics).removeDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1") ;
+
         cmdTopics.run(split(
                 "set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -m 8 -rb 9 -t 10 -orp tiered-storage-first"));
         OffloadPolicies offloadPolicies = OffloadPolicies.create("s3", "region", "bucket"
@@ -884,6 +895,9 @@ public class PulsarAdminToolTest {
 
         cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -ap"));
         verify(mockTopics).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1", true);
+
+        cmdTopics.run(split("get-delayed-delivery persistent://myprop/clust/ns1/ds1 --applied"));
+        verify(mockTopics).getDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1", true);
     }
 
     @Test
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 6b39193..1fafbc0 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1097,6 +1097,18 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Remove delayed delivery policies from a namespace")
+    private class RemoveDelayedDelivery extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().removeDelayedDeliveryMessages(namespace);
+        }
+    }
+
     @Parameters(commandDescription = "Get the inactive topic policy for a namespace")
     private class GetInactiveTopicPolicies extends CliCommand {
         @Parameter(description = "tenant/namespace\n", required = true)
@@ -1996,6 +2008,7 @@ public class CmdNamespaces extends CmdBase {
 
         jcommander.addCommand("set-delayed-delivery", new SetDelayedDelivery());
         jcommander.addCommand("get-delayed-delivery", new GetDelayedDelivery());
+        jcommander.addCommand("remove-delayed-delivery", new RemoveDelayedDelivery());
 
         jcommander.addCommand("get-inactive-topic-policies", new GetInactiveTopicPolicies());
         jcommander.addCommand("set-inactive-topic-policies", new SetInactiveTopicPolicies());
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index b4783d1..11ba768 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -1029,10 +1029,13 @@ public class CmdTopics extends CmdBase {
         @Parameter(description = "tenant/namespace/topic\n", required = true)
         private java.util.List<String> params;
 
+        @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
+        private boolean applied = false;
+
         @Override
         void run() throws PulsarAdminException {
             String topicName = validateTopicName(params);
-            print(admin.topics().getDelayedDeliveryPolicy(topicName));
+            print(admin.topics().getDelayedDeliveryPolicy(topicName, applied));
         }
     }