You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/08/28 13:33:39 UTC

[pulsar] branch master updated: support message dispatch rate on topic level (#7863)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 395037e  support message dispatch rate on topic level (#7863)
395037e is described below

commit 395037e9cfb03a3cbbc22347e95fa76dae282898
Author: hangc0276 <ha...@163.com>
AuthorDate: Fri Aug 28 21:33:11 2020 +0800

    support message dispatch rate on topic level (#7863)
    
    ### Motivation
    Support message dispatch rate on topic level.
    Based on the system topic function.
    
    ### Modifications
    Support set message dispatch rate on topic level.
    Support get message dispatch rate on topic level.
    Support remove message dispatch rate on topic level.
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 44 +++++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 89 ++++++++++++++++++++++
 .../SystemTopicBasedTopicPoliciesService.java      | 18 +++++
 .../broker/service/TopicPoliciesService.java       | 18 +++++
 .../pulsar/broker/service/TopicPolicyListener.java | 23 ++++++
 .../service/persistent/DispatchRateLimiter.java    | 40 ++++++++--
 .../broker/service/persistent/PersistentTopic.java | 36 ++++++++-
 .../broker/admin/TopicPoliciesDisableTest.java     | 21 +++++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 40 ++++++++++
 .../org/apache/pulsar/client/admin/Topics.java     | 68 +++++++++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 78 +++++++++++++++++++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 57 ++++++++++++++
 .../pulsar/common/policies/data/TopicPolicies.java |  5 ++
 13 files changed, 530 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 51c8952..0123aa7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -95,6 +95,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.naming.PartitionedManagedLedgerInfo;
+import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
@@ -3056,4 +3057,47 @@ public class PersistentTopicsBase extends AdminResource {
         });
 
     }
+
+    protected Optional<DispatchRate> internalGetDispatchRate() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        return getTopicPolicies(topicName).map(TopicPolicies::getDispatchRate);
+    }
+
+    protected CompletableFuture<Void> internalSetDispatchRate(DispatchRate dispatchRate) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        if (dispatchRate == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        TopicPolicies topicPolicies = getTopicPolicies(topicName)
+            .orElseGet(TopicPolicies::new);
+        topicPolicies.setDispatchRate(dispatchRate);
+        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+    }
+
+    protected CompletableFuture<Void> internalRemoveDispatchRate() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
+        if (!topicPolicies.isPresent()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        topicPolicies.get().setDispatchRate(null);
+        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+
+    }
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 521fb75..c00f270 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -30,6 +30,7 @@ import io.swagger.annotations.ApiResponses;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
@@ -55,6 +56,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
@@ -1695,5 +1697,92 @@ public class PersistentTopics extends PersistentTopicsBase {
         }
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/dispatchRate")
+    @ApiOperation(value = "Get dispatch rate configuration for specified topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getDispatchRate(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        try {
+            Optional<DispatchRate> dispatchRate = internalGetDispatchRate();
+            if (!dispatchRate.isPresent()) {
+                asyncResponse.resume(Response.noContent().build());
+            } else {
+                asyncResponse.resume(dispatchRate.get());
+            }
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/dispatchRate")
+    @ApiOperation(value = "Set message dispatch rate configuration for specified topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void setDispatchRate(@Suspended final AsyncResponse asyncResponse,
+                                @PathParam("tenant") String tenant,
+                                @PathParam("namespace") String namespace,
+                                @PathParam("topic") @Encoded String encodedTopic,
+                                @ApiParam(value = "Dispatch rate for the specified topic") DispatchRate dispatchRate) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetDispatchRate(dispatchRate).whenComplete((r, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed to set topic dispatch rate", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed to set topic dispatch rate");
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                try {
+                    log.info("[{}] Successfully set topic dispatch rate: tenant={}, namespace={}, topic={}, dispatchRate={}",
+                        clientAppId(),
+                        tenant,
+                        namespace,
+                        topicName.getLocalName(),
+                        jsonMapper().writeValueAsString(dispatchRate));
+                } catch (JsonProcessingException ignore) {}
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/dispatchRate")
+    @ApiOperation(value = "Remove message dispatch rate configuration for specified topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+        @ApiResponse(code = 404, message = "Topic does not exist"),
+        @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+        @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeDispatchRate(@Suspended final AsyncResponse asyncResponse,
+                                   @PathParam("tenant") String tenant,
+                                   @PathParam("namespace") String namespace,
+                                   @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalRemoveDispatchRate().whenComplete((r, ex) -> {
+            if (ex != null) {
+                log.error("Failed to remove topic dispatch rate", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                log.info("[{}] Successfully remove topic dispatch rate: tenant={}, namespace={}, topic={}",
+                    clientAppId(),
+                    tenant,
+                    namespace,
+                    topicName.getLocalName());
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 7c561c9..84add91 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -112,8 +113,15 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                             });
                     })
                 );
+                if (listeners.get(topicName) != null) {
+                    for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
+                        listener.onUpdate(policies);
+                    }
+                }
             }
         });
+
+
         return result;
     }
 
@@ -329,5 +337,15 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
         return policyCacheInitMap.get(namespaceName);
     }
 
+    @Override
+    public void registerListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
+        listeners.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).add(listener);
+    }
+
+    @Override
+    public void unregisterListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
+        listeners.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).remove(listener);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 1d9c382..9fde3a6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -24,7 +24,10 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
 
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Topic policies service
@@ -32,6 +35,7 @@ import java.util.concurrent.CompletableFuture;
 public interface TopicPoliciesService {
 
     TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled();
+    Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>();
 
     /**
      * Update policies for a topic async
@@ -73,6 +77,10 @@ public interface TopicPoliciesService {
      */
     void start();
 
+    void registerListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener);
+
+    void unregisterListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener);
+
     class TopicPoliciesServiceDisabled implements TopicPoliciesService {
 
         @Override
@@ -106,5 +114,15 @@ public interface TopicPoliciesService {
         public void start() {
             //No-op
         }
+
+        @Override
+        public void registerListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
+            //No-op
+        }
+
+        @Override
+        public void unregisterListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
+            //No-op
+        }
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPolicyListener.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPolicyListener.java
new file mode 100644
index 0000000..0e39edb
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPolicyListener.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+public interface TopicPolicyListener<T> {
+  void onUpdate(T data);
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 29eed73..4cc711f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -29,10 +29,12 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -147,23 +149,51 @@ public class DispatchRateLimiter {
      * default broker dispatch-throttling-rate
      */
     public void updateDispatchRate() {
-        DispatchRate dispatchRate = getPoliciesDispatchRate(brokerService);
+        Optional<DispatchRate> dispatchRate = getSystemTopicDispatchRate(brokerService, topicName);
+        if (!dispatchRate.isPresent()) {
+            dispatchRate =Optional.ofNullable(getPoliciesDispatchRate(brokerService));
 
-        if (dispatchRate == null) {
-            dispatchRate = createDispatchRate();
+            if (!dispatchRate.isPresent()) {
+                dispatchRate = Optional.of(createDispatchRate());
+            }
         }
 
-        updateDispatchRate(dispatchRate);
-        log.info("[{}] configured {} message-dispatch rate at broker {}", this.topicName, type, dispatchRate);
+        updateDispatchRate(dispatchRate.get());
+        log.info("[{}] configured {} message-dispatch rate at broker {}", this.topicName, type, dispatchRate.get());
     }
 
     public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies,
             String topicName, Type type) {
         final ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration();
+        if (serviceConfig.isTopicLevelPoliciesEnabled() && type == Type.TOPIC) {
+            Optional<DispatchRate> dispatchRate = getSystemTopicDispatchRate(brokerService, topicName);
+            if (dispatchRate.isPresent()) {
+                return true;
+            }
+        }
+
         policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName);
         return isDispatchRateNeeded(serviceConfig, policies, topicName, type);
     }
 
+    public static Optional<DispatchRate> getSystemTopicDispatchRate(BrokerService brokerService, String topicName) {
+        Optional<DispatchRate> dispatchRate = Optional.empty();
+        final ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration();
+        if (serviceConfiguration.isTopicLevelPoliciesEnabled()) {
+            try {
+                dispatchRate = Optional.ofNullable(brokerService.pulsar()
+                    .getTopicPoliciesService().getTopicPolicies(TopicName.get(topicName)))
+                    .map(TopicPolicies::getDispatchRate);
+            } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e){
+                log.debug("Topic {} policies cache have not init.", topicName);
+            } catch (Exception e) {
+                log.debug("[{}] Failed to get topic policies. Exception: {}", topicName, e);
+            }
+        }
+
+        return dispatchRate;
+    }
+
     public static boolean isDispatchRateNeeded(final ServiceConfiguration serviceConfig,
             final Optional<Policies> policies, final String topicName, final Type type) {
         DispatchRate dispatchRate = getPoliciesDispatchRate(serviceConfig.getClusterName(), policies, type);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 711867c..3899313 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+
 import com.carrotsearch.hppc.ObjectObjectHashMap;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -87,7 +88,7 @@ import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.StreamingStats;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
-import org.apache.pulsar.broker.service.TopicPoliciesService;
+import org.apache.pulsar.broker.service.TopicPolicyListener;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
@@ -133,7 +134,7 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback {
+public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback, TopicPolicyListener<TopicPolicies> {
 
     // Managed ledger associated with the topic
     protected final ManagedLedger ledger;
@@ -225,6 +226,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         this.backloggedCursorThresholdEntries = brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
 
         initializeDispatchRateLimiterIfNeeded(Optional.empty());
+        brokerService.getPulsar().getTopicPoliciesService().registerListener(TopicName.get(topic), this);
 
         this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
 
@@ -901,6 +903,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
                                     subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
 
+                                    brokerService.pulsar().getTopicPoliciesService().unregisterListener(TopicName.get(topic), getPersistentTopic());
                                     log.info("[{}] Topic deleted", topic);
                                     deleteFuture.complete(null);
                                 }
@@ -989,6 +992,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
                     subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
 
+                    brokerService.pulsar().getTopicPoliciesService().unregisterListener(TopicName.get(topic), getPersistentTopic());
                     log.info("[{}] Topic closed", topic);
                     closeFuture.complete(null);
                 }
@@ -2344,6 +2348,34 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return maxUnackedMessagesOnSubscription;
     }
 
+    @Override
+    public void onUpdate(TopicPolicies policies) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] update topic policy: {}", topic, policies);
+        }
+
+        initializeTopicDispatchRateLimiterIfNeeded(Optional.ofNullable(policies));
+        if (this.dispatchRateLimiter.isPresent() && policies != null
+                && policies.getDispatchRate() != null) {
+            dispatchRateLimiter.ifPresent(dispatchRateLimiter ->
+                dispatchRateLimiter.updateDispatchRate(policies.getDispatchRate()));
+        }
+    }
+
+    private void initializeTopicDispatchRateLimiterIfNeeded(Optional<TopicPolicies> policies) {
+        synchronized (dispatchRateLimiter) {
+            if (!dispatchRateLimiter.isPresent() && policies.isPresent() &&
+                policies.get().getDispatchRate() != null) {
+                this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
+            }
+        }
+    }
+
+    private PersistentTopic getPersistentTopic() {
+        return this;
+    }
+
+
     @VisibleForTesting
     public MessageDeduplication getMessageDeduplication() {
         return messageDeduplication;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
index 819cb5c..329c3c7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -129,4 +130,24 @@ public class TopicPoliciesDisableTest extends MockedPulsarServiceBaseTest {
             Assert.assertEquals(e.getStatusCode(), 405);
         }
     }
+
+    @Test
+    public void testDispatchRateDisabled() throws Exception {
+        DispatchRate dispatchRate = new DispatchRate();
+        log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, testTopic);
+
+        try {
+            admin.topics().setDispatchRate(testTopic, dispatchRate);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+
+        try {
+            admin.topics().getDispatchRate(testTopic);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 903ddce..ef16adb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -337,4 +338,43 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
 
         admin.topics().deletePartitionedTopic(testTopic, true);
     }
+
+
+    @Test
+    public void testGetSetDispatchRate() throws Exception {
+        DispatchRate dispatchRate = new DispatchRate(100, 10000, 1, true);
+        log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, testTopic);
+
+        admin.topics().setDispatchRate(testTopic, dispatchRate);
+        log.info("Dispatch Rate set success on topic: {}", testTopic);
+
+        Thread.sleep(3000);
+        DispatchRate getDispatchRate = admin.topics().getDispatchRate(testTopic);
+        log.info("Dispatch Rate: {} get on topic: {}", getDispatchRate, testTopic);
+        Assert.assertEquals(getDispatchRate, dispatchRate);
+
+        admin.topics().deletePartitionedTopic(testTopic, true);
+    }
+
+    @Test
+    public void testRemoveDispatchRate() throws Exception {
+        DispatchRate dispatchRate = new DispatchRate(100, 10000, 1, true);
+        log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, testTopic);
+
+        admin.topics().setDispatchRate(testTopic, dispatchRate);
+        log.info("Dispatch Rate set success on topic: {}", testTopic);
+
+        Thread.sleep(3000);
+        DispatchRate getDispatchRate = admin.topics().getDispatchRate(testTopic);
+        log.info("Dispatch Rate: {} get on topic: {}", getDispatchRate, testTopic);
+        Assert.assertEquals(getDispatchRate, dispatchRate);
+
+        admin.topics().removeDispatchRate(testTopic);
+        Thread.sleep(3000);
+        log.info("Dispatch Rate get on topic: {} after remove", getDispatchRate, testTopic);
+        getDispatchRate = admin.topics().getDispatchRate(testTopic);
+        Assert.assertNull(getDispatchRate);
+
+        admin.topics().deletePartitionedTopic(testTopic, true);
+    }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 6476f27..93d3192 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -1845,4 +1846,71 @@ public interface Topics {
      * @return
      */
     CompletableFuture<Void> disableDeduplicationAsync(String topic);
+
+    /**
+     * Set message-dispatch-rate (topic can dispatch this many messages per second).
+     *
+     * @param topic
+     * @param dispatchRate
+     *            number of messages per second
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException;
+
+    /**
+     * Set message-dispatch-rate asynchronously.
+     * <p/>
+     * topic can dispatch this many messages per second
+     *
+     * @param topic
+     * @param dispatchRate
+     *            number of messages per second
+     */
+    CompletableFuture<Void> setDispatchRateAsync(String topic, DispatchRate dispatchRate);
+
+    /**
+     * Get message-dispatch-rate (topic can dispatch this many messages per second).
+     *
+     * @param topic
+     * @returns messageRate
+     *            number of messages per second
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    DispatchRate getDispatchRate(String topic) throws PulsarAdminException;
+
+    /**
+     * Get message-dispatch-rate asynchronously.
+     * <p/>
+     * Topic can dispatch this many messages per second.
+     *
+     * @param topic
+     * @returns messageRate
+     *            number of messages per second
+     */
+    CompletableFuture<DispatchRate> getDispatchRateAsync(String topic);
+
+    /**
+     * Remove message-dispatch-rate.
+     * <p/>
+     * Remove topic message dispatch rate
+     *
+     * @param topic
+     * @throws PulsarAdminException
+     *              unexpected error
+     */
+    void removeDispatchRate(String topic) throws PulsarAdminException;
+
+    /**
+     * Remove message-dispatch-rate asynchronously.
+     * <p/>
+     * Remove topic message dispatch rate
+     *
+     * @param topic
+     * @throws PulsarAdminException
+     *              unexpected error
+     */
+    CompletableFuture<Void> removeDispatchRateAsync(String topic) throws PulsarAdminException;
+
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 62a39b9..ffd9339 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -69,6 +69,7 @@ import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -1926,5 +1927,82 @@ public class TopicsImpl extends BaseResource implements Topics {
         return asyncDeleteRequest(path);
     }
 
+    @Override
+    public DispatchRate getDispatchRate(String topic) throws PulsarAdminException {
+        try {
+            return getDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<DispatchRate> getDispatchRateAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "dispatchRate");
+        final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+            new InvocationCallback<DispatchRate>() {
+                @Override
+                public void completed(DispatchRate dispatchRate) {
+                    future.complete(dispatchRate);
+                }
+
+                @Override
+                public void failed(Throwable throwable) {
+                    future.completeExceptionally(getApiException(throwable.getCause()));
+                }
+            });
+        return future;
+    }
+
+    @Override
+    public void setDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException {
+        try {
+            setDispatchRateAsync(topic, dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setDispatchRateAsync(String topic, DispatchRate dispatchRate) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "dispatchRate");
+        return asyncPostRequest(path, Entity.entity(dispatchRate, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void removeDispatchRate(String topic) throws PulsarAdminException {
+        try {
+            removeDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeDispatchRateAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "dispatchRate");
+        return asyncDeleteRequest(path);
+    }
+
+
     private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 22e514e..b84794e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -119,6 +120,9 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("get-persistence", new GetPersistence());
         jcommander.addCommand("set-persistence", new SetPersistence());
         jcommander.addCommand("remove-persistence", new RemovePersistence());
+        jcommander.addCommand("get-dispatch-rate", new GetDispatchRate());
+        jcommander.addCommand("set-dispatch-rate", new SetDispatchRate());
+        jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate());
     }
 
     @Parameters(commandDescription = "Get the list of topics under a namespace.")
@@ -1105,4 +1109,57 @@ public class CmdTopics extends CmdBase {
             admin.topics().removePersistence(persistentTopic);
         }
     }
+
+    @Parameters(commandDescription = "Get message dispatch rate for a topic")
+    private class GetDispatchRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(admin.topics().getDispatchRate(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set message dispatch rate for a topic")
+    private class SetDispatchRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--msg-dispatch-rate",
+                "-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+        private int msgDispatchRate = -1;
+
+        @Parameter(names = { "--byte-dispatch-rate",
+                "-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+        private long byteDispatchRate = -1;
+
+        @Parameter(names = { "--dispatch-rate-period",
+                "-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false)
+        private int dispatchRatePeriodSec = 1;
+
+        @Parameter(names = { "--relative-to-publish-rate",
+                "-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
+        private boolean relativeToPublishRate = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().setDispatchRate(persistentTopic,
+                new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove message dispatch rate for a topic")
+    private class RemoveDispatchRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().removeDispatchRate(persistentTopic);
+        }
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index 7864e0e..0e459c6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -49,6 +49,7 @@ public class TopicPolicies {
     private Integer maxUnackedMessagesOnSubscription = null;
     private Long delayedDeliveryTickTimeMillis = null;
     private Boolean delayedDeliveryEnabled = null;
+    private DispatchRate dispatchRate = null;
 
     public boolean isMaxUnackedMessagesOnConsumerSet() {
         return maxUnackedMessagesOnConsumer != null;
@@ -97,4 +98,8 @@ public class TopicPolicies {
     public boolean isMaxConsumersPerSubscriptionSet() {
         return maxConsumersPerSubscription != null;
     }
+
+    public boolean isDispatchRateSet() {
+        return dispatchRate != null;
+    }
 }