You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/08/12 00:41:29 UTC

[pulsar] branch master updated: support topic level delayed delivery policy (#7784)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e417d77  support topic level delayed delivery policy (#7784)
e417d77 is described below

commit e417d77a5035e93993ca2e1912282eddf315a6ca
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Wed Aug 12 08:41:10 2020 +0800

    support topic level delayed delivery policy (#7784)
    
    
    Master Issue: #2688
    
    ### Motivation
    support topic level delayed delivery policy
    
    ### Modifications
    Support set/get/remove delayed delivery policy on topic level.
    
    ### Verifying this change
    
    Added Unit test to verify set/get/remove delayed delivery policy at Topic level work as expected when Topic level policy is enabled/disabled
    
    - org.apache.pulsar.broker.admin.AdminApiDelayedDelivery#testEnableAndDisableTopicDelayedDelivery
    - org.apache.pulsar.broker.admin.AdminApiDelayedDelivery#testEnableTopicDelayedDelivery
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  26 ++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  57 +++++++++
 .../PersistentDispatcherMultipleConsumers.java     |   9 +-
 .../broker/service/persistent/PersistentTopic.java |  24 +++-
 .../service/persistent/DelayedDeliveryTest.java    | 135 +++++++++++++++++++++
 .../org/apache/pulsar/client/admin/Topics.java     |  48 ++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   |  80 ++++++++++++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  56 +++++++++
 .../pulsar/common/policies/data/TopicPolicies.java |   9 ++
 9 files changed, 439 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 520a181..612c7a3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -97,6 +97,7 @@ import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.naming.PartitionedManagedLedgerInfo;
 import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
@@ -579,6 +580,31 @@ public class PersistentTopicsBase extends AdminResource {
         });
     }
 
+    protected void internalSetDelayedDeliveryPolicies(AsyncResponse asyncResponse, DelayedDeliveryPolicies deliveryPolicies) {
+        TopicPolicies topicPolicies = null;
+        try {
+            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.error("Topic {} policies cache have not init.", topicName);
+            asyncResponse.resume(new RestException(e));
+            return;
+        }
+        if (topicPolicies == null) {
+            topicPolicies = new TopicPolicies();
+        }
+        topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive());
+        topicPolicies.setDelayedDeliveryTickTimeMillis(deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
+        pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
+                .whenComplete((result, ex) -> {
+                    if (ex != null) {
+                        log.error("Failed set delayed delivery policy for topic", ex);
+                        asyncResponse.resume(new RestException(ex));
+                    } else {
+                        asyncResponse.resume(Response.noContent().build());
+                    }
+                });
+    }
+
     private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
         List<CompletableFuture<Void>> results = new ArrayList<>(clusters.size() -1);
         clusters.forEach(cluster -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 3b5b07f..0d48c5a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -45,11 +45,13 @@ import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.OffloadProcessStatus;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -248,6 +250,61 @@ public class PersistentTopics extends PersistentTopicsBase {
         internalCreateNonPartitionedTopic(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/delayedDelivery")
+    @ApiOperation(value = "Get delayed delivery messages config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),})
+    public void getDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncResponse,
+                                           @PathParam("tenant") String tenant,
+                                           @PathParam("namespace") String namespace,
+                                           @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+        if (topicPolicies.isDelayedDeliveryEnabledSet() && topicPolicies.isDelayedDeliveryTickTimeMillisSet()) {
+            asyncResponse.resume(new DelayedDeliveryPolicies(topicPolicies.getDelayedDeliveryTickTimeMillis()
+                    , topicPolicies.getDelayedDeliveryEnabled()));
+        } else {
+            asyncResponse.resume(Response.noContent().build());
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/delayedDelivery")
+    @ApiOperation(value = "Set delayed delivery messages config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void setDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncResponse,
+                                           @PathParam("tenant") String tenant,
+                                           @PathParam("namespace") String namespace,
+                                           @PathParam("topic") @Encoded String encodedTopic,
+                                           @ApiParam(value = "Delayed delivery policies for the specified topic") DelayedDeliveryPolicies deliveryPolicies) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateAdminAccessForTenant(tenant);
+        validatePoliciesReadOnlyAccess();
+        checkTopicLevelPolicyEnable();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        internalSetDelayedDeliveryPolicies(asyncResponse, deliveryPolicies);
+    }
+
+
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/delayedDelivery")
+    @ApiOperation(value = "Set delayed delivery messages config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void deleteDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncResponse,
+                                              @PathParam("tenant") String tenant,
+                                              @PathParam("namespace") String namespace,
+                                              @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        setDelayedDeliveryPolicies(asyncResponse, tenant, namespace, encodedTopic, null);
+    }
+
     /**
      * It updates number of partitions of an existing non-global partitioned topic. It requires partitioned-topic to be
      * already exist and number of new partitions must be greater than existing number of partitions. Decrementing
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index b8607f5..67c2df9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -336,7 +336,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 }
 
                 havePendingReplayRead = true;
-                Set<? extends Position> deletedMessages = topic.delayedDeliveryEnabled ?
+                Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled() ?
                         asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
                 // clear already acked positions from replay bucket
 
@@ -771,7 +771,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
 
     @Override
     public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) {
-        if (!topic.delayedDeliveryEnabled) {
+        if (!topic.isDelayedDeliveryEnabled()) {
             // If broker has the feature disabled, always deliver messages immediately
             return false;
         }
@@ -783,7 +783,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                         .of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
             }
 
-            delayedDeliveryTracker.get().resetTickTime(topic.delayedDeliveryTickTimeMillis);
+            delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
             return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime());
         }
     }
@@ -793,13 +793,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             return messagesToRedeliver.items(maxMessagesToRead,
                     (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId));
         } else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) {
-            delayedDeliveryTracker.get().resetTickTime(topic.delayedDeliveryTickTimeMillis);
+            delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
             return delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
         } else {
             return Collections.emptySet();
         }
     }
 
+    @Override
     public synchronized long getNumberOfDelayedMessages() {
         return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 804e5f5..5af4b30 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2129,8 +2129,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
      * @return TopicPolicies is exist else return null.
      */
     private TopicPolicies getTopicPolicies(TopicName topicName) {
+        TopicName cloneTopicName = topicName;
+        if (topicName.isPartitioned()) {
+            cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
+        }
         try {
-            return brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+            return brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(cloneTopicName);
         } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
             log.warn("Topic {} policies cache have not init.", topicName.getPartitionedTopicName());
             return null;
@@ -2303,4 +2307,22 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         });
         return completableFuture;
     }
+
+    public long getDelayedDeliveryTickTimeMillis() {
+        TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
+        //Topic level setting has higher priority than namespace level
+        if (topicPolicies != null && topicPolicies.isDelayedDeliveryTickTimeMillisSet()) {
+            return topicPolicies.getDelayedDeliveryTickTimeMillis();
+        }
+        return delayedDeliveryTickTimeMillis;
+    }
+
+    public boolean isDelayedDeliveryEnabled() {
+        TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
+        //Topic level setting has higher priority than namespace level
+        if (topicPolicies != null && topicPolicies.isDelayedDeliveryEnabledSet()) {
+            return topicPolicies.getDelayedDeliveryEnabled();
+        }
+        return delayedDeliveryEnabled;
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 82f6241..5f6d2c4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -19,15 +19,21 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import lombok.Cleanup;
 
 import org.apache.pulsar.client.api.Consumer;
@@ -37,6 +43,8 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -46,6 +54,9 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
     @Override
     @BeforeClass
     public void setup() throws Exception {
+        conf.setSystemTopicEnabled(true);
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setDelayedDeliveryTickTimeMillis(1024);
         super.internalSetup();
         super.producerBaseSetup();
     }
@@ -315,4 +326,128 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
             }
         }
     }
+
+    @Test(timeOut = 20000)
+    public void testEnableAndDisableTopicDelayedDelivery() throws Exception {
+        String topicName = "persistent://public/default/topic-" + UUID.randomUUID().toString();
+
+        admin.topics().createPartitionedTopic(topicName, 3);
+        assertNull(admin.topics().getDelayedDeliveryPolicy(topicName));
+        DelayedDeliveryPolicies delayedDeliveryPolicies = new DelayedDeliveryPolicies(2000, false);
+        admin.topics().setDelayedDeliveryPolicy(topicName, delayedDeliveryPolicies);
+        //wait for update
+        for (int i = 0; i < 50; i++) {
+            Thread.sleep(100);
+            if (admin.topics().getDelayedDeliveryPolicy(topicName) != null) {
+                break;
+            }
+        }
+
+        assertFalse(admin.topics().getDelayedDeliveryPolicy(topicName).isActive());
+        assertEquals(2000, admin.topics().getDelayedDeliveryPolicy(topicName).getTickTime());
+
+        admin.topics().removeDelayedDeliveryPolicy(topicName);
+        //wait for update
+        for (int i = 0; i < 50; i++) {
+            Thread.sleep(100);
+            if (admin.topics().getDelayedDeliveryPolicy(topicName) == null) {
+                break;
+            }
+        }
+        assertNull(admin.topics().getDelayedDeliveryPolicy(topicName));
+    }
+
+    @Test(timeOut = 20000)
+    public void testEnableTopicDelayedDelivery() throws Exception {
+        final String topicName = "persistent://public/default/test" + UUID.randomUUID().toString();
+
+        admin.topics().createPartitionedTopic(topicName, 3);
+        assertNull(admin.topics().getDelayedDeliveryPolicy(topicName));
+        //1 Set topic policy
+        DelayedDeliveryPolicies delayedDeliveryPolicies = new DelayedDeliveryPolicies(2000, true);
+        admin.topics().setDelayedDeliveryPolicy(topicName, delayedDeliveryPolicies);
+        //wait for update
+        for (int i = 0; i < 50; i++) {
+            Thread.sleep(100);
+            if (admin.topics().getDelayedDeliveryPolicy(topicName) != null) {
+                break;
+            }
+        }
+        //2 Setup consumer and producer
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("test-sub" + System.currentTimeMillis())
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName).create();
+        //3 Send delay message
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage()
+                    .value("delayed-msg-" + i)
+                    .deliverAfter(5, TimeUnit.SECONDS)
+                    .sendAsync();
+        }
+        producer.flush();
+
+        //4 There will be no message in the first 3 seconds
+        assertNull(consumer.receive(3, TimeUnit.SECONDS));
+
+        Set<String> delayedMessages = new HashSet<>();
+        for (int i = 0; i < 10; i++) {
+            Message<String> msg = consumer.receive(4, TimeUnit.SECONDS);
+            delayedMessages.add(msg.getValue());
+            consumer.acknowledge(msg);
+        }
+        for (int i = 0; i < 10; i++) {
+            assertTrue(delayedMessages.contains("delayed-msg-" + i));
+        }
+        //5 Disable delayed delivery
+        delayedDeliveryPolicies.setActive(false);
+        admin.topics().setDelayedDeliveryPolicy(topicName, delayedDeliveryPolicies);
+        //wait for update
+        for (int i = 0; i < 50; i++) {
+            Thread.sleep(100);
+            if (!admin.topics().getDelayedDeliveryPolicy(topicName).isActive()) {
+                break;
+            }
+        }
+        producer.newMessage().value("disabled-msg").deliverAfter(5, TimeUnit.SECONDS).send();
+        //6 Delay deliver is disabled, so we can receive message immediately
+        Message<String> msg = consumer.receive(1, TimeUnit.SECONDS);
+        assertNotNull(msg);
+        consumer.acknowledge(msg);
+        //7 Set a very long tick time, so that trackDelayedDelivery will fail. we can receive msg immediately.
+        delayedDeliveryPolicies.setActive(true);
+        delayedDeliveryPolicies.setTickTime(Integer.MAX_VALUE);
+        admin.topics().setDelayedDeliveryPolicy(topicName, delayedDeliveryPolicies);
+        //wait for update
+        for (int i = 0; i < 50; i++) {
+            Thread.sleep(100);
+            if (admin.topics().getDelayedDeliveryPolicy(topicName).isActive()) {
+                break;
+            }
+        }
+        producer.newMessage().value("long-tick-msg").deliverAfter(5, TimeUnit.SECONDS).send();
+        msg = consumer.receive(1, TimeUnit.SECONDS);
+        assertNotNull(msg);
+        consumer.acknowledge(msg);
+        //8 remove topic policy, it will use namespace level policy
+        admin.topics().removeDelayedDeliveryPolicy(topicName);
+        //wait for update
+        for (int i = 0; i < 50; i++) {
+            Thread.sleep(100);
+            if (admin.topics().getDelayedDeliveryPolicy(topicName) == null) {
+                break;
+            }
+        }
+        producer.newMessage().value("long-tick-msg").deliverAfter(2, TimeUnit.SECONDS).send();
+        msg = consumer.receive(1, TimeUnit.SECONDS);
+        assertNull(msg);
+        msg = consumer.receive(3, TimeUnit.SECONDS);
+        assertNotNull(msg);
+    }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 72098ee..a40dcc1 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -1436,6 +1437,53 @@ public interface Topics {
     void removeBacklogQuota(String topic) throws PulsarAdminException;
 
     /**
+     * Get the delayed delivery policy for a specified topic.
+     * @param topic
+     * @return
+     * @throws PulsarAdminException
+     */
+    DelayedDeliveryPolicies getDelayedDeliveryPolicy(String topic) throws PulsarAdminException;
+
+    /**
+     * Get the delayed delivery policy for a specified topic asynchronously.
+     * @param topic
+     * @return
+     */
+    CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryPolicyAsync(String topic);
+
+    /**
+     * Set the delayed delivery policy for a specified topic.
+     * @param topic
+     * @param delayedDeliveryPolicies
+     * @throws PulsarAdminException
+     */
+    void setDelayedDeliveryPolicy(String topic
+            , DelayedDeliveryPolicies delayedDeliveryPolicies) throws PulsarAdminException;
+
+    /**
+     * Set the delayed delivery policy for a specified topic asynchronously.
+     * @param topic
+     * @param delayedDeliveryPolicies
+     * @return
+     */
+    CompletableFuture<Void> setDelayedDeliveryPolicyAsync(String topic
+            , DelayedDeliveryPolicies delayedDeliveryPolicies);
+
+    /**
+     * Remove the delayed delivery policy for a specified topic asynchronously.
+     * @param topic
+     * @return
+     */
+    CompletableFuture<Void> removeDelayedDeliveryPolicyAsync(String topic);
+
+    /**
+     * Remove the delayed delivery policy for a specified topic.
+     * @param topic
+     * @throws PulsarAdminException
+     */
+    void removeDelayedDeliveryPolicy(String topic) throws PulsarAdminException;
+
+    /**
      * Set message TTL for a topic.
      *
      * @param topic
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 4df515a..6b04e60 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -72,6 +72,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -1431,6 +1432,85 @@ public class TopicsImpl extends BaseResource implements Topics {
     }
 
     @Override
+    public DelayedDeliveryPolicies getDelayedDeliveryPolicy(String topic) throws PulsarAdminException {
+        try {
+            return getDelayedDeliveryPolicyAsync(topic).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryPolicyAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "delayedDelivery");
+        final CompletableFuture<DelayedDeliveryPolicies> future = new CompletableFuture<>();
+        asyncGetRequest(path, new InvocationCallback<DelayedDeliveryPolicies>() {
+            @Override
+            public void completed(DelayedDeliveryPolicies delayedDeliveryPolicies) {
+                future.complete(delayedDeliveryPolicies);
+            }
+
+            @Override
+            public void failed(Throwable throwable) {
+                future.completeExceptionally(getApiException(throwable.getCause()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<Void> removeDelayedDeliveryPolicyAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "delayedDelivery");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
+    public void removeDelayedDeliveryPolicy(String topic) throws PulsarAdminException {
+        try {
+            removeDelayedDeliveryPolicyAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setDelayedDeliveryPolicyAsync(String topic
+            , DelayedDeliveryPolicies delayedDeliveryPolicies) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "delayedDelivery");
+        return asyncPostRequest(path, Entity.entity(delayedDeliveryPolicies, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void setDelayedDeliveryPolicy(String topic
+            , DelayedDeliveryPolicies delayedDeliveryPolicies) throws PulsarAdminException {
+        try {
+            setDelayedDeliveryPolicyAsync(topic, delayedDeliveryPolicies)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
     public void setMessageTTL(String topic, int messageTTLInSecond) throws PulsarAdminException {
         try {
             TopicName topicName = validateTopic(topic);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 658233f..6791e05 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.util.RelativeTimeUtil;
@@ -111,6 +112,9 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("get-retention", new GetRetention());
         jcommander.addCommand("set-retention", new SetRetention());
         jcommander.addCommand("remove-retention", new RemoveRetention());
+        jcommander.addCommand("get-delayed-delivery", new GetDelayedDelivery());
+        jcommander.addCommand("set-delayed-delivery", new SetDelayedDelivery());
+        jcommander.addCommand("remove-delayed-delivery", new RemoveDelayedDelivery());
     }
 
     @Parameters(commandDescription = "Get the list of topics under a namespace.")
@@ -888,6 +892,58 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get the delayed delivery policy for a topic")
+    private class GetDelayedDelivery extends CliCommand {
+        @Parameter(description = "tenant/namespace/topic\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String topicName = validateTopicName(params);
+            print(admin.topics().getDelayedDeliveryPolicy(topicName));
+        }
+    }
+
+    @Parameters(commandDescription = "Set the delayed delivery policy on a topic")
+    private class SetDelayedDelivery extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--enable", "-e" }, description = "Enable delayed delivery messages")
+        private boolean enable = false;
+
+        @Parameter(names = { "--disable", "-d" }, description = "Disable delayed delivery messages")
+        private boolean disable = false;
+
+        @Parameter(names = { "--time", "-t" }, description = "The tick time for when retrying on delayed delivery messages, " +
+                "affecting the accuracy of the delivery time compared to the scheduled time. (eg: 1s, 10s, 1m, 5h, 3d)")
+        private String delayedDeliveryTimeStr = "1s";
+
+        @Override
+        void run() throws PulsarAdminException {
+            String topicName = validateTopicName(params);
+            long delayedDeliveryTimeInMills = TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(delayedDeliveryTimeStr));
+
+            if (enable == disable) {
+                throw new ParameterException("Need to specify either --enable or --disable");
+            }
+
+            admin.topics().setDelayedDeliveryPolicy(topicName, new DelayedDeliveryPolicies(delayedDeliveryTimeInMills, enable));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove the delayed delivery policy on a topic")
+    private class RemoveDelayedDelivery extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String topicName = validateTopicName(params);
+            admin.topics().removeDelayedDeliveryPolicy(topicName);
+        }
+    }
+
     @Parameters(commandDescription = "Get the message TTL for a topic")
     private class GetMessageTTL extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", required = true)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index f9fcaf8..796ff4a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -45,6 +45,15 @@ public class TopicPolicies {
     private Integer maxProducerPerTopic = null;
     private Integer maxConsumerPerTopic = null;
     private Integer maxConsumersPerSubscription = null;
+    private Long delayedDeliveryTickTimeMillis = null;
+    private Boolean delayedDeliveryEnabled = null;
+
+    public boolean isDelayedDeliveryTickTimeMillisSet(){
+        return delayedDeliveryTickTimeMillis != null;
+    }
+    public boolean isDelayedDeliveryEnabledSet(){
+        return delayedDeliveryEnabled != null;
+    }
 
     public boolean isBacklogQuotaSet() {
         return !backLogQuotaMap.isEmpty();