You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/05/20 02:59:01 UTC

[pulsar] branch master updated: Add rate limit support for Replicator between clusters (#4273)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 84d02ac  Add rate limit support for Replicator between clusters (#4273)
84d02ac is described below

commit 84d02ac4ad42497c052d627e834c5e07c0e3449a
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Mon May 20 10:58:57 2019 +0800

    Add rate limit support for Replicator between clusters (#4273)
    
    Currently the rate limit between replication clusters is not able to control.
    In Geo-replication, once a cluster is offline, and after a long time, if it comes back, it may get a lot of messages from other clusters, and may use all of the network bandwidth.
    This PR tries to provided a way to control the rate limit between cluster replications. Add rate limit support for Replicator between clusters.
    
    changes:
    - change DispatchRateLimiter.java to support 3 kind type: Topic, subscription, replicator.
    - add  DispatchRateLimiter support in PersistentReplicator.
    - add test and docs
---
 conf/broker.conf                                   |  10 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |  16 +-
 .../apache/pulsar/broker/admin/AdminResource.java  |   6 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  60 +++-
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |   4 +-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  30 +-
 .../pulsar/broker/service/BrokerService.java       |  30 +-
 .../apache/pulsar/broker/service/Replicator.java   |  12 +-
 .../service/persistent/DispatchRateLimiter.java    | 160 ++++++----
 .../PersistentDispatcherMultipleConsumers.java     |   7 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |   7 +-
 .../service/persistent/PersistentReplicator.java   |  80 ++++-
 .../broker/service/persistent/PersistentTopic.java |  20 +-
 .../org/apache/pulsar/broker/ConfigHelper.java     |  12 +-
 .../apache/pulsar/broker/admin/AdminApiTest.java   |   2 +-
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java    |   2 +-
 .../broker/service/ReplicatorRateLimiterTest.java  | 325 +++++++++++++++++++++
 .../pulsar/broker/service/ServerCnxTest.java       |  10 +-
 .../org/apache/pulsar/client/admin/Namespaces.java |  24 +-
 .../client/admin/internal/NamespacesImpl.java      |  26 +-
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  50 +++-
 .../pulsar/common/policies/data/Policies.java      |  14 +-
 site2/docs/admin-api-namespaces.md                 | 118 +++++++-
 23 files changed, 910 insertions(+), 115 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 37b1281..4d217a3 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -189,7 +189,15 @@ dispatchThrottlingRatePerSubscriptionInMsg=0
 
 # Default number of message-bytes dispatching throttling-limit for a subscription.
 # Using a value of 0, is disabling default message-byte dispatch-throttling.
-dispatchThrottlingRatePerSubscribeInByte=0
+dispatchThrottlingRatePerSubscriptionInByte=0
+
+# Default messages per second dispatch throttling-limit for every replicator in replication.
+# Using a value of 0, is disabling replication message dispatch-throttling
+dispatchThrottlingRatePerReplicatorInMsg=0
+
+# Default bytes per second dispatch throttling-limit for every replicator in replication.
+# Using a value of 0, is disabling replication message-byte dispatch-throttling
+dispatchThrottlingRatePerReplicatorInByte=0
 
 # By default we enable dispatch-throttling for both caught up consumers as well as consumers who have
 # backlog.
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index e594208..f047703 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -380,6 +380,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
         doc = "Default number of message-bytes dispatching throttling-limit for every topic. \n\n"
             + "Using a value of 0, is disabling default message-byte dispatch-throttling")
     private long dispatchThrottlingRatePerTopicInByte = 0;
+
     @FieldContext(
         dynamic = true,
         category = CATEGORY_POLICIES,
@@ -391,7 +392,20 @@ public class ServiceConfiguration implements PulsarConfiguration {
         category = CATEGORY_POLICIES,
         doc = "Default number of message-bytes dispatching throttling-limit for a subscription. \n\n"
             + "Using a value of 0, is disabling default message-byte dispatch-throttling.")
-    private long dispatchThrottlingRatePerSubscribeInByte = 0;
+    private long dispatchThrottlingRatePerSubscriptionInByte = 0;
+
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_POLICIES,
+        doc = "Default number of message dispatching throttling-limit for every replicator in replication. \n\n"
+            + "Using a value of 0, is disabling replication message dispatch-throttling")
+    private int dispatchThrottlingRatePerReplicatorInMsg = 0;
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_POLICIES,
+        doc = "Default number of message-bytes dispatching throttling-limit for every replicator in replication. \n\n"
+            + "Using a value of 0, is disabling replication message-byte dispatch-throttling")
+    private long dispatchThrottlingRatePerReplicatorInByte = 0;
 
     @FieldContext(
         dynamic = true,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 35d2286..2c87062 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -372,8 +372,8 @@ public abstract class AdminResource extends PulsarWebResource {
 
         final String cluster = config.getClusterName();
         // attach default dispatch rate polices
-        if (policies.clusterDispatchRate.isEmpty()) {
-            policies.clusterDispatchRate.put(cluster, dispatchRate());
+        if (policies.topicDispatchRate.isEmpty()) {
+            policies.topicDispatchRate.put(cluster, dispatchRate());
         }
 
         if (policies.subscriptionDispatchRate.isEmpty()) {
@@ -400,7 +400,7 @@ public abstract class AdminResource extends PulsarWebResource {
     protected DispatchRate subscriptionDispatchRate() {
         return new DispatchRate(
                 pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(),
-                pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte(),
+                pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInByte(),
                 1
         );
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 98fb11b..eab5cd9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -593,7 +593,7 @@ public abstract class NamespacesBase extends AdminResource {
                 log.debug("Failed to validate cluster ownership for {}-{}, {}", namespaceName.toString(), bundleRange, e.getMessage(), e);
             }
         }
-        
+
         // validate namespace ownership only if namespace is not owned by local-cluster (it happens when broker doesn't
         // receive replication-cluster change watch and still owning bundle
         if (!isOwnedByLocalCluster) {
@@ -657,7 +657,7 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    protected void internalSetDispatchRate(DispatchRate dispatchRate) {
+    protected void internalSetTopicDispatchRate(DispatchRate dispatchRate) {
         log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
         validateSuperUserAccess();
 
@@ -668,7 +668,7 @@ public abstract class NamespacesBase extends AdminResource {
             // Force to read the data s.t. the watch to the cache content is setup.
             policiesNode = policiesCache().getWithStat(path).orElseThrow(
                     () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
-            policiesNode.getKey().clusterDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
+            policiesNode.getKey().topicDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
 
             // Write back the new policies into zookeeper
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()),
@@ -694,11 +694,11 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    protected DispatchRate internalGetDispatchRate() {
+    protected DispatchRate internalGetTopicDispatchRate() {
         validateAdminAccessForTenant(namespaceName.getTenant());
 
         Policies policies = getNamespacePolicies(namespaceName);
-        DispatchRate dispatchRate = policies.clusterDispatchRate.get(pulsar().getConfiguration().getClusterName());
+        DispatchRate dispatchRate = policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName());
         if (dispatchRate != null) {
             return dispatchRate;
         } else {
@@ -806,6 +806,56 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
+    protected void internalSetReplicatorDispatchRate(DispatchRate dispatchRate) {
+        log.info("[{}] Set namespace replicator dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
+        validateSuperUserAccess();
+
+        Entry<Policies, Stat> policiesNode = null;
+
+        try {
+            final String path = path(POLICIES, namespaceName.toString());
+            // Force to read the data s.t. the watch to the cache content is setup.
+            policiesNode = policiesCache().getWithStat(path).orElseThrow(
+                () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+            policiesNode.getKey().replicatorDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
+
+            // Write back the new policies into zookeeper
+            globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()),
+                policiesNode.getValue().getVersion());
+            policiesCache().invalidate(path);
+
+            log.info("[{}] Successfully updated the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
+                namespaceName);
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}: does not exist",
+                clientAppId(), namespaceName);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn(
+                "[{}] Failed to update the replicatorDispatchRate for cluster on namespace {} expected policy node version={} : concurrent modification",
+                clientAppId(), namespaceName, policiesNode.getValue().getVersion());
+
+            throw new RestException(Status.CONFLICT, "Concurrent modification");
+        } catch (Exception e) {
+            log.error("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
+                namespaceName, e);
+            throw new RestException(e);
+        }
+    }
+
+    protected DispatchRate internalGetReplicatorDispatchRate() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+
+        Policies policies = getNamespacePolicies(namespaceName);
+        DispatchRate dispatchRate = policies.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName());
+        if (dispatchRate != null) {
+            return dispatchRate;
+        } else {
+            throw new RestException(Status.NOT_FOUND,
+                "replicator-Dispatch-rate is not configured for cluster " + pulsar().getConfiguration().getClusterName());
+        }
+    }
+
     protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
         validateAdminAccessForTenant(namespaceName.getTenant());
         validatePoliciesReadOnlyAccess();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 1f2c8c9..35b5713 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -435,7 +435,7 @@ public class Namespaces extends NamespacesBase {
     public void setDispatchRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, DispatchRate dispatchRate) {
         validateNamespaceName(property, cluster, namespace);
-        internalSetDispatchRate(dispatchRate);
+        internalSetTopicDispatchRate(dispatchRate);
     }
 
     @GET
@@ -446,7 +446,7 @@ public class Namespaces extends NamespacesBase {
     public DispatchRate getDispatchRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace) {
         validateNamespaceName(property, cluster, namespace);
-        return internalGetDispatchRate();
+        return internalGetTopicDispatchRate();
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 860ab84..b0715ff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -185,7 +185,7 @@ public class Namespaces extends NamespacesBase {
         validateNamespaceName(property, namespace);
         internalGrantPermissionOnSubscription(subscription, roles);
     }
-    
+
     @DELETE
     @Path("/{tenant}/{namespace}/permissions/{role}")
     @ApiOperation(value = "Revoke all permissions to a role on a namespace.")
@@ -208,7 +208,7 @@ public class Namespaces extends NamespacesBase {
         validateNamespaceName(property, namespace);
         internalRevokePermissionsOnSubscription(subscription, role);
     }
-    
+
     @GET
     @Path("/{tenant}/{namespace}/replication")
     @ApiOperation(value = "Get the replication clusters for a namespace.", response = String.class, responseContainer = "List")
@@ -336,7 +336,7 @@ public class Namespaces extends NamespacesBase {
     public void setDispatchRate(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
             DispatchRate dispatchRate) {
         validateNamespaceName(tenant, namespace);
-        internalSetDispatchRate(dispatchRate);
+        internalSetTopicDispatchRate(dispatchRate);
     }
 
     @GET
@@ -347,7 +347,7 @@ public class Namespaces extends NamespacesBase {
     public DispatchRate getDispatchRate(@PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetDispatchRate();
+        return internalGetTopicDispatchRate();
     }
 
     @POST
@@ -393,6 +393,28 @@ public class Namespaces extends NamespacesBase {
         return internalGetSubscribeRate();
     }
 
+    @POST
+    @Path("/{tenant}/{namespace}/replicatorDispatchRate")
+    @ApiOperation(value = "Set replicator dispatch-rate throttling for all topics of the namespace")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
+    public void setReplicatorDispatchRate(@PathParam("tenant") String tenant,
+                                            @PathParam("namespace") String namespace,
+                                            DispatchRate dispatchRate) {
+        validateNamespaceName(tenant, namespace);
+        internalSetReplicatorDispatchRate(dispatchRate);
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/replicatorDispatchRate")
+    @ApiOperation(value = "Get replicator dispatch-rate configured for the namespace, -1 represents not configured yet")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+        @ApiResponse(code = 404, message = "Namespace does not exist") })
+    public DispatchRate getReplicatorDispatchRate(@PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        return internalGetReplicatorDispatchRate();
+    }
+
     @GET
     @Path("/{tenant}/{namespace}/backlogQuotaMap")
     @ApiOperation(value = "Get backlog quota map on a namespace.")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index fddda96..b26a6c1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1195,11 +1195,11 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                 log.warn("Failed to change load manager due to {}", ex);
             }
         });
-        // add listener to update message-dispatch-rate in msg
+        // add listener to update message-dispatch-rate in msg for topic
         registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", (dispatchRatePerTopicInMsg) -> {
             updateTopicMessageDispatchRate();
         });
-        // add listener to update message-dispatch-rate in byte
+        // add listener to update message-dispatch-rate in byte for topic
         registerConfigurationListener("dispatchThrottlingRatePerTopicInByte", (dispatchRatePerTopicInByte) -> {
             updateTopicMessageDispatchRate();
         });
@@ -1207,14 +1207,22 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         registerConfigurationListener("autoSkipNonRecoverableData", (skipNonRecoverableLedger) -> {
             updateManagedLedgerConfig();
         });
-        // add listener to update message-dispatch-rate in msg
+        // add listener to update message-dispatch-rate in msg for subscription
         registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInMsg", (dispatchRatePerTopicInMsg) -> {
             updateSubscriptionMessageDispatchRate();
         });
-        // add listener to update message-dispatch-rate in byte
-        registerConfigurationListener("dispatchThrottlingRatePerSubscribeInByte", (dispatchRatePerTopicInByte) -> {
+        // add listener to update message-dispatch-rate in byte for subscription
+        registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInByte", (dispatchRatePerTopicInByte) -> {
             updateSubscriptionMessageDispatchRate();
         });
+        // add listener to update message-dispatch-rate in msg for replicator
+        registerConfigurationListener("dispatchThrottlingRatePerReplicatorInMsg", (dispatchRatePerTopicInMsg) -> {
+            updateReplicatorMessageDispatchRate();
+        });
+        // add listener to update message-dispatch-rate in byte for replicator
+        registerConfigurationListener("dispatchThrottlingRatePerReplicatorInByte", (dispatchRatePerTopicInByte) -> {
+            updateReplicatorMessageDispatchRate();
+        });
         // add more listeners here
     }
 
@@ -1243,6 +1251,18 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         });
     }
 
+    private void updateReplicatorMessageDispatchRate() {
+        this.pulsar().getExecutor().submit(() -> {
+            // update message-rate for each topic Replicator in Geo-replication
+            forEachTopic(topic ->
+                topic.getReplicators().forEach((name, persistentReplicator) -> {
+                    if (persistentReplicator.getRateLimiter().isPresent()) {
+                        persistentReplicator.getRateLimiter().get().updateDispatchRate();
+                    }
+                }));
+        });
+    }
+
     private void updateManagedLedgerConfig() {
         this.pulsar().getExecutor().execute(() -> {
             // update managed-ledger config of each topic
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
index f8a23f6..7412b22 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
@@ -18,14 +18,17 @@
  */
 package org.apache.pulsar.broker.service;
 
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 
 public interface Replicator {
 
     void startProducer();
-    
+
     ReplicatorStats getStats();
 
     CompletableFuture<Void> disconnect();
@@ -36,4 +39,11 @@ public interface Replicator {
 
     String getRemoteCluster();
 
+    default void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
+        //No-op
+    }
+
+    default Optional<DispatchRateLimiter> getRateLimiter() {
+        return Optional.empty();
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 7e9052a..d7d0ade 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -38,23 +38,26 @@ import org.slf4j.LoggerFactory;
 
 public class DispatchRateLimiter {
 
+    public enum Type {
+        TOPIC,
+        SUBSCRIPTION,
+        REPLICATOR
+    }
+
     private final String topicName;
-    private final String subscriptionName;
+    private final Type type;
+
     private final BrokerService brokerService;
     private RateLimiter dispatchRateLimiterOnMessage;
     private RateLimiter dispatchRateLimiterOnByte;
 
-    public DispatchRateLimiter(PersistentTopic topic, String subscriptionName) {
+    public DispatchRateLimiter(PersistentTopic topic, Type type) {
         this.topicName = topic.getName();
-        this.subscriptionName = subscriptionName;
         this.brokerService = topic.getBrokerService();
+        this.type = type;
         updateDispatchRate();
     }
 
-    public DispatchRateLimiter(PersistentTopic topic) {
-        this(topic, null);
-    }
-
     /**
      * returns available msg-permit if msg-dispatch-throttling is enabled else it returns -1
      *
@@ -101,66 +104,104 @@ public class DispatchRateLimiter {
     }
 
     /**
+     * createDispatchRate according to broker service config.
+     *
+     * @return
+     */
+    private DispatchRate createDispatchRate() {
+        int dispatchThrottlingRateInMsg;
+        long dispatchThrottlingRateInByte;
+        ServiceConfiguration config = brokerService.pulsar().getConfiguration();
+
+        switch (type) {
+            case TOPIC:
+                dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerTopicInMsg();
+                dispatchThrottlingRateInByte = config.getDispatchThrottlingRatePerTopicInByte();
+                break;
+            case SUBSCRIPTION:
+                dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerSubscriptionInMsg();
+                dispatchThrottlingRateInByte = config.getDispatchThrottlingRatePerSubscriptionInByte();
+                break;
+            case REPLICATOR:
+                dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerReplicatorInMsg();
+                dispatchThrottlingRateInByte = config.getDispatchThrottlingRatePerReplicatorInByte();
+                break;
+            default:
+                dispatchThrottlingRateInMsg = -1;
+                dispatchThrottlingRateInByte = -1;
+        }
+
+        return new DispatchRate(dispatchThrottlingRateInMsg, dispatchThrottlingRateInByte, 1);
+    }
+
+    /**
      * Update dispatch-throttling-rate. gives first priority to namespace-policy configured dispatch rate else applies
      * default broker dispatch-throttling-rate
      */
     public void updateDispatchRate() {
         DispatchRate dispatchRate = getPoliciesDispatchRate(brokerService);
+
         if (dispatchRate == null) {
-            if (subscriptionName == null) {
-                dispatchRate = new DispatchRate(brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg(),
-                    brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInByte(), 1);
-            } else {
-                dispatchRate = new DispatchRate(brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(),
-                    brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte(), 1);
-            }
+            dispatchRate = createDispatchRate();
         }
+
         updateDispatchRate(dispatchRate);
-        log.info("[{}] [{}] configured message-dispatch rate at broker {}", this.topicName, this.subscriptionName, dispatchRate);
+        log.info("[{}] configured {} message-dispatch rate at broker {}", this.topicName, type, dispatchRate);
     }
 
-    
     public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies,
-            String topicName, String subscriptionName) {
+            String topicName, Type type) {
         final ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration();
         policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName);
-        return isDispatchRateNeeded(serviceConfig, policies, topicName, subscriptionName);
+        return isDispatchRateNeeded(serviceConfig, policies, topicName, type);
     }
 
     public static boolean isDispatchRateNeeded(final ServiceConfiguration serviceConfig,
-            final Optional<Policies> policies, final String topicName, final String subscriptionName) {
-        DispatchRate dispatchRate = getPoliciesDispatchRate(serviceConfig.getClusterName(), policies, topicName,
-                subscriptionName);
+            final Optional<Policies> policies, final String topicName, final Type type) {
+        DispatchRate dispatchRate = getPoliciesDispatchRate(serviceConfig.getClusterName(), policies, type);
         if (dispatchRate == null) {
-            if (subscriptionName == null) {
-                return serviceConfig.getDispatchThrottlingRatePerTopicInMsg() > 0
+            switch (type) {
+                case TOPIC:
+                    return serviceConfig.getDispatchThrottlingRatePerTopicInMsg() > 0
                         || serviceConfig.getDispatchThrottlingRatePerTopicInByte() > 0;
-            } else {
-                return serviceConfig.getDispatchThrottlingRatePerSubscriptionInMsg() > 0
-                        || serviceConfig.getDispatchThrottlingRatePerSubscribeInByte() > 0;
+                case SUBSCRIPTION:
+                    return serviceConfig.getDispatchThrottlingRatePerSubscriptionInMsg() > 0
+                        || serviceConfig.getDispatchThrottlingRatePerSubscriptionInByte() > 0;
+                case REPLICATOR:
+                    return serviceConfig.getDispatchThrottlingRatePerReplicatorInMsg() > 0
+                        || serviceConfig.getDispatchThrottlingRatePerReplicatorInByte() > 0;
+                default:
+                    log.error("error DispatchRateLimiter type: {} ", type);
+                    return false;
             }
         }
         return true;
     }
-    
+
     public void onPoliciesUpdate(Policies data) {
         String cluster = brokerService.pulsar().getConfiguration().getClusterName();
 
         DispatchRate dispatchRate;
-        if (subscriptionName == null) {
-            dispatchRate = data.clusterDispatchRate.get(cluster);
-        } else {
-            dispatchRate = data.subscriptionDispatchRate.get(cluster);
+
+        switch (type) {
+            case TOPIC:
+                dispatchRate = data.topicDispatchRate.get(cluster);
+                break;
+            case SUBSCRIPTION:
+                dispatchRate = data.subscriptionDispatchRate.get(cluster);
+                break;
+            case REPLICATOR:
+                dispatchRate = data.replicatorDispatchRate.get(cluster);
+                break;
+            default:
+                log.error("error DispatchRateLimiter type: {} ", type);
+                dispatchRate = null;
         }
+
         // update dispatch-rate only if it's configured in policies else ignore
         if (dispatchRate != null) {
-            int inMsg = (subscriptionName == null) ?
-                brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg() :
-                brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg();
-            long inByte = (subscriptionName == null) ?
-                brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInByte() :
-                brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte();
-            final DispatchRate newDispatchRate = new DispatchRate(inMsg, inByte, 1);
+            final DispatchRate newDispatchRate = createDispatchRate();
+
             // if policy-throttling rate is disabled and cluster-throttling is enabled then apply
             // cluster-throttling rate
             if (!isDispatchRateEnabled(dispatchRate) && isDispatchRateEnabled(newDispatchRate)) {
@@ -170,6 +211,29 @@ public class DispatchRateLimiter {
         }
     }
 
+    public static DispatchRate getPoliciesDispatchRate(final String cluster, Optional<Policies> policies, Type type) {
+        // return policy-dispatch rate only if it's enabled in policies
+        return policies.map(p -> {
+            DispatchRate dispatchRate;
+            switch (type) {
+                case TOPIC:
+                    dispatchRate = p.topicDispatchRate.get(cluster);
+                    break;
+                case SUBSCRIPTION:
+                    dispatchRate = p.subscriptionDispatchRate.get(cluster);
+                    break;
+                case REPLICATOR:
+                    dispatchRate = p.replicatorDispatchRate.get(cluster);
+                    break;
+                default:
+                    log.error("error DispatchRateLimiter type: {} ", type);
+                    return null;
+            }
+            return isDispatchRateEnabled(dispatchRate) ? dispatchRate : null;
+        }).orElse(null);
+    }
+
+
     /**
      * Gets configured dispatch-rate from namespace policies. Returns null if dispatch-rate is not configured
      *
@@ -178,10 +242,9 @@ public class DispatchRateLimiter {
     public DispatchRate getPoliciesDispatchRate(BrokerService brokerService) {
         final String cluster = brokerService.pulsar().getConfiguration().getClusterName();
         final Optional<Policies> policies = getPolicies(brokerService, topicName);
-        return getPoliciesDispatchRate(cluster, policies, topicName, subscriptionName);
+        return getPoliciesDispatchRate(cluster, policies, type);
     }
-    
-    
+
     public static Optional<Policies> getPolicies(BrokerService brokerService, String topicName) {
         final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject();
         final String path = path(POLICIES, namespace.toString());
@@ -195,19 +258,6 @@ public class DispatchRateLimiter {
         return policies;
     }
 
-    public static DispatchRate getPoliciesDispatchRate(final String cluster, Optional<Policies> policies, final String topicName, final String subscriptionName) {
-        // return policy-dispatch rate only if it's enabled in policies
-        return policies.map(p -> {
-            DispatchRate dispatchRate;
-            if (subscriptionName == null) {
-                dispatchRate = p.clusterDispatchRate.get(cluster);
-            } else {
-                dispatchRate = p.subscriptionDispatchRate.get(cluster);
-            }
-            return isDispatchRateEnabled(dispatchRate) ? dispatchRate : null;
-        }).orElse(null);
-    }
-
     /**
      * Update dispatch rate by updating msg and byte rate-limiter. If dispatch-rate is configured < 0 then it closes
      * the rate-limiter and disables appropriate rate-limiter.
@@ -216,7 +266,7 @@ public class DispatchRateLimiter {
      */
     public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
         // synchronized to prevent race condition from concurrent zk-watch
-        log.info("[{}] [{}] setting message-dispatch-rate {}", topicName, subscriptionName, dispatchRate);
+        log.info("setting message-dispatch-rate {}", dispatchRate);
 
         long msgRate = dispatchRate.dispatchThrottlingRateInMsg;
         long byteRate = dispatchRate.dispatchThrottlingRateInByte;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index b9351c2..0bb4965 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.RedeliveryTracker;
 import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
 import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
@@ -643,10 +644,10 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
     @Override
     public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
         if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
-                .isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), name)) {
-            this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, name));
+                .isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), Type.SUBSCRIPTION)) {
+            this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.SUBSCRIPTION));
         }
     }
-    
+
     private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 4356fbe..e1e17a3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.RedeliveryTracker;
 import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
@@ -495,10 +496,10 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
     @Override
     public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
         if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
-                .isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), name)) {
-            this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, name));
+                .isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), Type.SUBSCRIPTION)) {
+            this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.SUBSCRIPTION));
         }
     }
-    
+
     private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index cda58e3..2253104 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -18,7 +18,10 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
+
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -40,11 +43,13 @@ import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.Replicator;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.SendCallback;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
@@ -59,6 +64,8 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
     private final PersistentTopic topic;
     private final ManagedCursor cursor;
 
+    private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
+
     private int readBatchSize;
 
     private final int producerQueueThreshold;
@@ -101,6 +108,8 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
             topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize());
         producerQueueThreshold = (int) (producerQueueSize * 0.9);
 
+        this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
+
         startProducer();
     }
 
@@ -147,9 +156,53 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
     }
 
 
-    protected void readMoreEntries() {
+    /**
+     * Calculate available permits for read entries.
+     *
+     * @return
+     *   0:  Producer queue is full, no permits.
+     *  -1:  Rate Limiter reaches limit.
+     *  >0:  available permits for read entries.
+     */
+    private int getAvailablePermits() {
         int availablePermits = producerQueueSize - PENDING_MESSAGES_UPDATER.get(this);
 
+        // return 0, if Producer queue is full, it will pause read entries.
+        if (availablePermits <= 0) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}][{} -> {}] Producer queue is full, availablePermits: {}, pause reading",
+                    topicName, localCluster, remoteCluster, availablePermits);
+            }
+            return 0;
+        }
+
+        // handle rate limit
+        if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
+            DispatchRateLimiter rateLimiter = dispatchRateLimiter.get();
+            // no permits from rate limit
+            if (!rateLimiter.hasMessageDispatchPermit()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{} -> {}] message-read exceeded topic replicator message-rate {}/{}, schedule after a {}",
+                        topicName, localCluster, remoteCluster,
+                        rateLimiter.getDispatchRateOnMsg(), rateLimiter.getDispatchRateOnByte(),
+                        MESSAGE_RATE_BACKOFF_MS);
+                }
+                return -1;
+            }
+
+            // if dispatch-rate is in msg then read only msg according to available permit
+            long availablePermitsOnMsg = rateLimiter.getAvailableDispatchRateLimitOnMsg();
+            if (availablePermitsOnMsg > 0) {
+                availablePermits = Math.min(availablePermits, (int) availablePermitsOnMsg);
+            }
+        }
+
+        return availablePermits;
+    }
+
+    protected void readMoreEntries() {
+        int availablePermits = getAvailablePermits();
+
         if (availablePermits > 0) {
             int messagesToRead = Math.min(availablePermits, readBatchSize);
             if (!isWritable()) {
@@ -174,10 +227,14 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
                             localCluster, remoteCluster, messagesToRead);
                 }
             }
+        } else if (availablePermits == -1) {
+            // no permits from rate limit
+            topic.getBrokerService().executor().schedule(
+                () -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
         } else {
             if (log.isDebugEnabled()) {
-                log.debug("[{}][{} -> {}] Producer queue is full, pause reading", topicName, localCluster,
-                        remoteCluster);
+                log.debug("[{}][{} -> {}] No Permits for reading. availablePermits: {}",
+                    topicName, localCluster, remoteCluster, availablePermits);
             }
         }
     }
@@ -267,6 +324,10 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
                     continue;
                 }
 
+                if (dispatchRateLimiter.isPresent()) {
+                    dispatchRateLimiter.get().tryDispatchPermit(1, entry.getLength());
+                }
+
                 // Increment pending messages for messages produced locally
                 PENDING_MESSAGES_UPDATER.incrementAndGet(this);
 
@@ -576,5 +637,18 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
         expiryMonitor.expireMessages(messageTTLInSeconds);
     }
 
+    @Override
+    public Optional<DispatchRateLimiter> getRateLimiter() {
+        return dispatchRateLimiter;
+    }
+
+    @Override
+    public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
+        if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
+            .isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), Type.REPLICATOR)) {
+            this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.REPLICATOR));
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentReplicator.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4cbcf11..b0392f9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -81,6 +81,7 @@ import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.StreamingStats;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
 import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
@@ -277,17 +278,23 @@ public class PersistentTopic implements Topic, AddEntryCallback {
 
     private void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
         synchronized (dispatchRateLimiter) {
+            // dispatch rate limiter for topic
             if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
-                    .isDispatchRateNeeded(brokerService, policies, topic, null)) {
-                this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this));
+                    .isDispatchRateNeeded(brokerService, policies, topic, Type.TOPIC)) {
+                this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
             }
             if (!subscribeRateLimiter.isPresent() && SubscribeRateLimiter
                     .isDispatchRateNeeded(brokerService, policies, topic)) {
                 this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
             }
-            subscriptions.forEach((name, subscription) -> {
-                subscription.getDispatcher().initializeDispatchRateLimiterIfNeeded(policies);
-            });
+
+            // dispatch rate limiter for each subscription
+            subscriptions.forEach((name, subscription) ->
+                subscription.getDispatcher().initializeDispatchRateLimiterIfNeeded(policies));
+
+            // dispatch rate limiter for each replicator
+            replicators.forEach((name, replicator) ->
+                replicator.initializeDispatchRateLimiterIfNeeded(policies));
         }
     }
 
@@ -1656,6 +1663,9 @@ public class PersistentTopic implements Topic, AddEntryCallback {
                 sub.getDispatcher().getRateLimiter().get().onPoliciesUpdate(data);
             }
         });
+        replicators.forEach((name, replicator) ->
+            replicator.getRateLimiter().get().onPoliciesUpdate(data)
+        );
         checkMessageExpiry();
         CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
         CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java
index 020d7b3..108c7ac 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java
@@ -41,7 +41,7 @@ public class ConfigHelper {
         );
     }
 
-    public static DispatchRate dispatchRate(ServiceConfiguration configuration) {
+    public static DispatchRate topicDispatchRate(ServiceConfiguration configuration) {
         return new DispatchRate(
                 configuration.getDispatchThrottlingRatePerTopicInMsg(),
                 configuration.getDispatchThrottlingRatePerTopicInByte(),
@@ -52,11 +52,19 @@ public class ConfigHelper {
     public static DispatchRate subscriptionDispatchRate(ServiceConfiguration configuration) {
         return new DispatchRate(
                 configuration.getDispatchThrottlingRatePerSubscriptionInMsg(),
-                configuration.getDispatchThrottlingRatePerSubscribeInByte(),
+                configuration.getDispatchThrottlingRatePerSubscriptionInByte(),
                 1
         );
     }
 
+    public static DispatchRate replicatorDispatchRate(ServiceConfiguration configuration) {
+        return new DispatchRate(
+            configuration.getDispatchThrottlingRatePerReplicatorInMsg(),
+            configuration.getDispatchThrottlingRatePerReplicatorInByte(),
+            1
+        );
+    }
+
     public static SubscribeRate subscribeRate(ServiceConfiguration configuration) {
         return new SubscribeRate(
                 configuration.getSubscribeThrottlingRatePerConsumer(),
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 66e9067..c8ee9ca 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -637,7 +637,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
 
         // set default quotas on namespace
         Policies.setStorageQuota(policies, ConfigHelper.backlogQuota(conf));
-        policies.clusterDispatchRate.put("test", ConfigHelper.dispatchRate(conf));
+        policies.topicDispatchRate.put("test", ConfigHelper.topicDispatchRate(conf));
         policies.subscriptionDispatchRate.put("test", ConfigHelper.subscriptionDispatchRate(conf));
         policies.clusterSubscribeRate.put("test", ConfigHelper.subscribeRate(conf));
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index eda4409..42ea2ce 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -632,7 +632,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
 
         // set default quotas on namespace
         Policies.setStorageQuota(policies, ConfigHelper.backlogQuota(conf));
-        policies.clusterDispatchRate.put("test", ConfigHelper.dispatchRate(conf));
+        policies.topicDispatchRate.put("test", ConfigHelper.topicDispatchRate(conf));
         policies.subscriptionDispatchRate.put("test", ConfigHelper.subscriptionDispatchRate(conf));
         policies.clusterSubscribeRate.put("test", ConfigHelper.subscribeRate(conf));
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
new file mode 100644
index 0000000..4631b07
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.collect.Sets;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import lombok.Cleanup;
+
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Starts 3 brokers that are in 3 different clusters
+ */
+public class ReplicatorRateLimiterTest extends ReplicatorTestBase {
+
+    protected String methodName;
+
+    @BeforeMethod
+    public void beforeMethod(Method m) throws Exception {
+        methodName = m.getName();
+    }
+
+    @Override
+    @BeforeClass(timeOut = 30000)
+    void setup() throws Exception {
+        super.setup();
+    }
+
+    @Override
+    @AfterClass(timeOut = 30000)
+    void shutdown() throws Exception {
+        super.shutdown();
+    }
+
+    enum DispatchRateType {
+        messageRate, byteRate
+    }
+
+    @DataProvider(name = "dispatchRateType")
+    public Object[][] dispatchRateProvider() {
+        return new Object[][] { { DispatchRateType.messageRate }, { DispatchRateType.byteRate } };
+    }
+
+    /**
+     * verifies dispatch rate for replicators get changed once namespace policies changed.
+     *
+     * 1. verify default replicator not configured.
+     * 2. change namespace setting of replicator dispatchRateMsg, verify topic changed.
+     * 3. change namespace setting of replicator dispatchRateByte, verify topic changed.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testReplicatorRateLimiterDynamicallyChange() throws Exception {
+        log.info("--- Starting ReplicatorTest::{} --- ", methodName);
+
+        final String namespace = "pulsar/replicatorchange";
+        final String topicName = "persistent://" + namespace + "/ratechange";
+
+        admin1.namespaces().createNamespace(namespace);
+        // 0. set 2 clusters, there will be 1 replicator in each topic
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+
+        @Cleanup
+        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
+            .build();
+
+        Producer<byte[]> producer = client1.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
+        producer.close();
+
+        PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+        // 1. default replicator throttling not configured
+        Assert.assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+
+        // 2. change namespace setting of replicator dispatchRateMsg, verify topic changed.
+        int messageRate = 100;
+        DispatchRate dispatchRateMsg = new DispatchRate(messageRate, -1, 360);
+        admin1.namespaces().setReplicatorDispatchRate(namespace, dispatchRateMsg);
+
+        boolean replicatorUpdated = false;
+        int retry = 5;
+        for (int i = 0; i < retry; i++) {
+            if (topic.getReplicators().values().get(0).getRateLimiter().isPresent()) {
+                replicatorUpdated = true;
+                break;
+            } else {
+                if (i != retry - 1) {
+                    Thread.sleep(100);
+                }
+            }
+        }
+        Assert.assertTrue(replicatorUpdated);
+        Assert.assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), messageRate);
+
+        // 3. change namespace setting of replicator dispatchRateByte, verify topic changed.
+        messageRate = 500;
+        DispatchRate dispatchRateByte = new DispatchRate(-1, messageRate, 360);
+        admin1.namespaces().setReplicatorDispatchRate(namespace, dispatchRateByte);
+        replicatorUpdated = false;
+        for (int i = 0; i < retry; i++) {
+            if (topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte() == messageRate) {
+                replicatorUpdated = true;
+                break;
+            } else {
+                if (i != retry - 1) {
+                    Thread.sleep(100);
+                }
+            }
+        }
+        Assert.assertTrue(replicatorUpdated);
+        Assert.assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), dispatchRateByte);
+    }
+
+    /**
+     * verifies dispatch rate for replicators works well for both Message limit and Byte limit .
+     *
+     * 1. verify topic replicator get configured.
+     * 2. namespace setting of replicator dispatchRate, verify consumer in other cluster could not receive all messages.
+     *
+     * @throws Exception
+     */
+    @Test(dataProvider =  "dispatchRateType", timeOut = 5000)
+    public void testReplicatorRateLimiterMessageNotReceivedAllMessages(DispatchRateType dispatchRateType) throws Exception {
+        log.info("--- Starting ReplicatorTest::{} --- ", methodName);
+
+        final String namespace = "pulsar/replicatorbyteandmsg" + dispatchRateType.toString();
+        final String topicName = "persistent://" + namespace + "/notReceivedAll";
+
+        admin1.namespaces().createNamespace(namespace);
+        // 0. set 2 clusters, there will be 1 replicator in each topic
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+
+        final int messageRate = 100;
+        DispatchRate dispatchRate;
+        if (DispatchRateType.messageRate.equals(dispatchRateType)) {
+            dispatchRate = new DispatchRate(messageRate, -1, 360);
+        } else {
+            dispatchRate = new DispatchRate(-1, messageRate, 360);
+        }
+        admin1.namespaces().setReplicatorDispatchRate(namespace, dispatchRate);
+
+        @Cleanup
+        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
+            .build();
+
+        Producer<byte[]> producer = client1.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
+
+        PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+        boolean replicatorUpdated = false;
+        int retry = 5;
+        for (int i = 0; i < retry; i++) {
+            if (topic.getReplicators().values().get(0).getRateLimiter().isPresent()) {
+                replicatorUpdated = true;
+                break;
+            } else {
+                if (i != retry - 1) {
+                    Thread.sleep(100);
+                }
+            }
+        }
+        Assert.assertTrue(replicatorUpdated);
+        if (DispatchRateType.messageRate.equals(dispatchRateType)) {
+            Assert.assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), messageRate);
+        } else {
+            Assert.assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), messageRate);
+        }
+
+        @Cleanup
+        PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
+            .build();
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+
+        Consumer<byte[]> consumer = client2.newConsumer().topic(topicName).subscriptionName("sub2-in-cluster2").messageListener((c1, msg) -> {
+            Assert.assertNotNull(msg, "Message cannot be null");
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message [{}] in the listener", receivedMessage);
+            totalReceived.incrementAndGet();
+        }).subscribe();
+
+        int numMessages = 500;
+        // Asynchronously produce messages
+        for (int i = 0; i < numMessages; i++) {
+            producer.send(new byte[80]);
+        }
+
+        log.info("Received message number: [{}]", totalReceived.get());
+
+        Assert.assertTrue(totalReceived.get() < messageRate * 2);
+
+        consumer.close();
+        producer.close();
+    }
+
+    /**
+     * verifies dispatch rate for replicators works well for both Message limit.
+     *
+     * 1. verify topic replicator get configured.
+     * 2. namespace setting of replicator dispatchRate,
+     *      verify consumer in other cluster could receive all messages < message limit.
+     * 3. verify consumer in other cluster could not receive all messages > message limit.
+     *
+     * @throws Exception
+     */
+    @Test(timeOut = 5000)
+    public void testReplicatorRateLimiterMessageReceivedAllMessages() throws Exception {
+        log.info("--- Starting ReplicatorTest::{} --- ", methodName);
+
+        final String namespace = "pulsar/replicatormsg";
+        final String topicName = "persistent://" + namespace + "/notReceivedAll";
+
+        admin1.namespaces().createNamespace(namespace);
+        // 0. set 2 clusters, there will be 1 replicator in each topic
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+
+        final int messageRate = 100;
+        DispatchRate dispatchRate = new DispatchRate(messageRate, -1, 360);
+        admin1.namespaces().setReplicatorDispatchRate(namespace, dispatchRate);
+
+        @Cleanup
+        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
+            .build();
+
+        Producer<byte[]> producer = client1.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
+
+        PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+
+        boolean replicatorUpdated = false;
+        int retry = 5;
+        for (int i = 0; i < retry; i++) {
+            if (topic.getReplicators().values().get(0).getRateLimiter().isPresent()) {
+                replicatorUpdated = true;
+                break;
+            } else {
+                if (i != retry - 1) {
+                    Thread.sleep(100);
+                }
+            }
+        }
+        Assert.assertTrue(replicatorUpdated);
+        Assert.assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), messageRate);
+
+        @Cleanup
+        PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
+            .build();
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+
+        Consumer<byte[]> consumer = client2.newConsumer().topic(topicName).subscriptionName("sub2-in-cluster2").messageListener((c1, msg) -> {
+            Assert.assertNotNull(msg, "Message cannot be null");
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message [{}] in the listener", receivedMessage);
+            totalReceived.incrementAndGet();
+        }).subscribe();
+
+        int numMessages = 50;
+        // Asynchronously produce messages
+        for (int i = 0; i < numMessages; i++) {
+            producer.send(new byte[80]);
+        }
+
+        Thread.sleep(1000);
+        log.info("Received message number: [{}]", totalReceived.get());
+
+        Assert.assertEquals(totalReceived.get(), numMessages);
+
+
+        numMessages = 200;
+        // Asynchronously produce messages
+        for (int i = 0; i < numMessages; i++) {
+            producer.send(new byte[80]);
+        }
+        Thread.sleep(1000);
+        log.info("Received message number: [{}]", totalReceived.get());
+
+        Assert.assertEquals(totalReceived.get(), messageRate);
+
+        consumer.close();
+        producer.close();
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(ReplicatorRateLimiterTest.class);
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 1fe2f97..82f61c2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -51,8 +51,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import javax.naming.AuthenticationException;
-
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -1275,7 +1273,7 @@ public class ServerCnxTest {
         ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
         Policies policies = mock(Policies.class);
         policies.encryption_required = true;
-        policies.clusterDispatchRate = Maps.newHashMap();
+        policies.topicDispatchRate = Maps.newHashMap();
         doReturn(Optional.of(policies)).when(zkDataCache).get(AdminResource.path(POLICIES, TopicName.get(encryptionRequiredTopicName).getNamespace()));
         doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(zkDataCache).getAsync(AdminResource.path(POLICIES, TopicName.get(encryptionRequiredTopicName).getNamespace()));
         doReturn(zkDataCache).when(configCacheService).policiesCache();
@@ -1303,7 +1301,7 @@ public class ServerCnxTest {
         ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
         Policies policies = mock(Policies.class);
         policies.encryption_required = true;
-        policies.clusterDispatchRate = Maps.newHashMap();
+        policies.topicDispatchRate = Maps.newHashMap();
         doReturn(Optional.of(policies)).when(zkDataCache).get(AdminResource.path(POLICIES, TopicName.get(encryptionRequiredTopicName).getNamespace()));
         doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(zkDataCache).getAsync(AdminResource.path(POLICIES, TopicName.get(encryptionRequiredTopicName).getNamespace()));
         doReturn(zkDataCache).when(configCacheService).policiesCache();
@@ -1333,7 +1331,7 @@ public class ServerCnxTest {
         ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
         Policies policies = mock(Policies.class);
         policies.encryption_required = true;
-        policies.clusterDispatchRate = Maps.newHashMap();
+        policies.topicDispatchRate = Maps.newHashMap();
         doReturn(Optional.of(policies)).when(zkDataCache).get(AdminResource.path(POLICIES, TopicName.get(encryptionRequiredTopicName).getNamespace()));
         doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(zkDataCache).getAsync(AdminResource.path(POLICIES, TopicName.get(encryptionRequiredTopicName).getNamespace()));
         doReturn(zkDataCache).when(configCacheService).policiesCache();
@@ -1368,7 +1366,7 @@ public class ServerCnxTest {
         ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
         Policies policies = mock(Policies.class);
         policies.encryption_required = true;
-        policies.clusterDispatchRate = Maps.newHashMap();
+        policies.topicDispatchRate = Maps.newHashMap();
         doReturn(Optional.of(policies)).when(zkDataCache).get(AdminResource.path(POLICIES, TopicName.get(encryptionRequiredTopicName).getNamespace()));
         doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(zkDataCache).getAsync(AdminResource.path(POLICIES, TopicName.get(encryptionRequiredTopicName).getNamespace()));
         doReturn(zkDataCache).when(configCacheService).policiesCache();
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 6b8c22c..29f509b 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -384,7 +384,7 @@ public interface Namespaces {
      * @throws PulsarAdminException
      */
     void grantPermissionOnSubscription(String namespace, String subscription, Set<String> roles) throws PulsarAdminException;
-    
+
     /**
      * Revoke permissions on a subscription's admin-api access.
      * @param namespace
@@ -393,7 +393,7 @@ public interface Namespaces {
      * @throws PulsarAdminException
      */
     void revokePermissionOnSubscription(String namespace, String subscription, String role) throws PulsarAdminException;
-    
+
     /**
      * Get the replication clusters for a namespace.
      * <p>
@@ -932,6 +932,26 @@ public interface Namespaces {
      */
     DispatchRate getSubscriptionDispatchRate(String namespace) throws PulsarAdminException;
 
+    /**
+     * Set replicator-message-dispatch-rate (Replicators under this namespace can dispatch this many messages per second)
+     *
+     * @param namespace
+     * @param dispatchRate
+     *            number of messages per second
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setReplicatorDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException;
+
+    /** Get replicator-message-dispatch-rate (Replicators under this namespace can dispatch this many messages per second)
+     *
+     * @param namespace
+     * @returns DispatchRate
+     *            number of messages per second
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    DispatchRate getReplicatorDispatchRate(String namespace) throws PulsarAdminException;
 
     /**
      * Clear backlog for all topics on a namespace
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index b80d238..42cdfbd 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -233,7 +233,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
         }
     }
 
-    
+
     @Override
     public void grantPermissionOnSubscription(String namespace, String subscription, Set<String> roles)
             throws PulsarAdminException {
@@ -256,7 +256,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
             throw getApiException(e);
         }
     }
-    
+
     @Override
     public List<String> getNamespaceReplicationClusters(String namespace) throws PulsarAdminException {
         try {
@@ -550,6 +550,28 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     }
 
     @Override
+    public void setReplicatorDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "replicatorDispatchRate");
+            request(path).post(Entity.entity(dispatchRate, MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public DispatchRate getReplicatorDispatchRate(String namespace) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "replicatorDispatchRate");
+            return request(path).get(DispatchRate.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
     public void clearNamespaceBacklog(String namespace) throws PulsarAdminException {
         try {
             NamespaceName ns = NamespaceName.get(namespace);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index d45df9c..7b7043f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -224,7 +224,7 @@ public class CmdNamespaces extends CmdBase {
 
         @Parameter(names = "--subscription", description = "Subscription name for which permission will be revoked to roles", required = true)
         private String subscription;
-        
+
         @Parameter(names = "--role", description = "Client role to which revoke permissions", required = true)
         private String role;
 
@@ -234,7 +234,7 @@ public class CmdNamespaces extends CmdBase {
             admin.namespaces().revokePermissionOnSubscription(namespace, subscription, role);
         }
     }
-    
+
     @Parameters(commandDescription = "Get the permissions on a namespace")
     private class Permissions extends CliCommand {
         @Parameter(description = "tenant/namespace\n", required = true)
@@ -548,7 +548,7 @@ public class CmdNamespaces extends CmdBase {
         @Parameter(description = "tenant/namespace\n", required = true)
         private java.util.List<String> params;
 
-        @Parameter(names = { "--sub-msg-dispatch-rate",
+        @Parameter(names = { "--msg-dispatch-rate",
             "-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
         private int msgDispatchRate = -1;
 
@@ -580,6 +580,43 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Set replicator message-dispatch-rate for all topics of the namespace")
+    private class SetReplicatorDispatchRate extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--msg-dispatch-rate",
+            "-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+        private int msgDispatchRate = -1;
+
+        @Parameter(names = { "--byte-dispatch-rate",
+            "-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+        private long byteDispatchRate = -1;
+
+        @Parameter(names = { "--dispatch-rate-period",
+            "-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false)
+        private int dispatchRatePeriodSec = 1;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().setReplicatorDispatchRate(namespace,
+                new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec));
+        }
+    }
+
+    @Parameters(commandDescription = "Get replicator configured message-dispatch-rate for all topics of the namespace (Disabled if value < 0)")
+    private class GetReplicatorDispatchRate extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(admin.namespaces().getReplicatorDispatchRate(namespace));
+        }
+    }
+
     @Parameters(commandDescription = "Get the backlog quota policies for a namespace")
     private class GetBacklogQuotaMap extends CliCommand {
         @Parameter(description = "tenant/namespace\n", required = true)
@@ -1062,10 +1099,10 @@ public class CmdNamespaces extends CmdBase {
         jcommander.addCommand("permissions", new Permissions());
         jcommander.addCommand("grant-permission", new GrantPermissions());
         jcommander.addCommand("revoke-permission", new RevokePermissions());
-        
+
         jcommander.addCommand("grant-subscription-permission", new GrantSubscriptionPermissions());
         jcommander.addCommand("revoke-subscription-permission", new RevokeSubscriptionPermissions());
-        
+
         jcommander.addCommand("set-clusters", new SetReplicationClusters());
         jcommander.addCommand("get-clusters", new GetReplicationClusters());
 
@@ -1102,6 +1139,9 @@ public class CmdNamespaces extends CmdBase {
         jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate());
         jcommander.addCommand("get-subscription-dispatch-rate", new GetSubscriptionDispatchRate());
 
+        jcommander.addCommand("set-replicator-dispatch-rate", new SetReplicatorDispatchRate());
+        jcommander.addCommand("get-replicator-dispatch-rate", new GetReplicatorDispatchRate());
+
         jcommander.addCommand("clear-backlog", new ClearBacklog());
 
         jcommander.addCommand("unsubscribe", new Unsubscribe());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 23728dd..efa18e4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -34,8 +34,9 @@ public class Policies {
     public Set<String> replication_clusters = Sets.newHashSet();
     public BundlesData bundles;
     public Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlog_quota_map = Maps.newHashMap();
-    public Map<String, DispatchRate> clusterDispatchRate = Maps.newHashMap();
+    public Map<String, DispatchRate> topicDispatchRate = Maps.newHashMap();
     public Map<String, DispatchRate> subscriptionDispatchRate = Maps.newHashMap();
+    public Map<String, DispatchRate> replicatorDispatchRate = Maps.newHashMap();
     public Map<String, SubscribeRate> clusterSubscribeRate = Maps.newHashMap();
     public PersistencePolicies persistence = null;
 
@@ -70,7 +71,8 @@ public class Policies {
     @Override
     public int hashCode() {
         return Objects.hash(auth_policies, replication_clusters,
-                backlog_quota_map, clusterDispatchRate,
+                backlog_quota_map,
+                topicDispatchRate, subscriptionDispatchRate, replicatorDispatchRate,
                 clusterSubscribeRate, deduplicationEnabled, persistence,
                 bundles, latency_stats_sample_rate,
                 message_ttl_in_seconds, retention_policies,
@@ -90,7 +92,9 @@ public class Policies {
             return Objects.equals(auth_policies, other.auth_policies)
                     && Objects.equals(replication_clusters, other.replication_clusters)
                     && Objects.equals(backlog_quota_map, other.backlog_quota_map)
-                    && Objects.equals(clusterDispatchRate, other.clusterDispatchRate)
+                    && Objects.equals(topicDispatchRate, other.topicDispatchRate)
+                    && Objects.equals(subscriptionDispatchRate, other.subscriptionDispatchRate)
+                    && Objects.equals(replicatorDispatchRate, other.replicatorDispatchRate)
                     && Objects.equals(clusterSubscribeRate, other.clusterSubscribeRate)
                     && Objects.equals(deduplicationEnabled, other.deduplicationEnabled)
                     && Objects.equals(persistence, other.persistence) && Objects.equals(bundles, other.bundles)
@@ -136,7 +140,9 @@ public class Policies {
                 .add("replication_clusters", replication_clusters).add("bundles", bundles)
                 .add("backlog_quota_map", backlog_quota_map).add("persistence", persistence)
                 .add("deduplicationEnabled", deduplicationEnabled)
-                .add("clusterDispatchRate", clusterDispatchRate)
+                .add("topicDispatchRate", topicDispatchRate)
+                .add("subscriptionDispatchRate", subscriptionDispatchRate)
+                .add("replicatorDispatchRate", replicatorDispatchRate)
                 .add("clusterSubscribeRate", clusterSubscribeRate)
                 .add("latency_stats_sample_rate", latency_stats_sample_rate)
                 .add("antiAffinityGroup", antiAffinityGroup)
diff --git a/site2/docs/admin-api-namespaces.md b/site2/docs/admin-api-namespaces.md
index f2cd929..7cd0c1d 100644
--- a/site2/docs/admin-api-namespaces.md
+++ b/site2/docs/admin-api-namespaces.md
@@ -577,7 +577,7 @@ $ pulsar-admin namespaces set-dispatch-rate test-tenant/ns1 \
 ###### Java
 
 ```java
-admin.namespaces().setDispatchRate(namespace, 1000, 1048576, 1)
+admin.namespaces().setDispatchRate(namespace, new DispatchRate(1000, 1048576, 1))
 ```
 
 #### get configured message-rate
@@ -611,6 +611,122 @@ admin.namespaces().getDispatchRate(namespace)
 ```
 
 
+#### set dispatch throttling for subscription
+
+It sets message dispatch rate for all the subscription of topics under a given namespace.
+Dispatch rate can be restricted by number of message per X seconds (`msg-dispatch-rate`) or by number of message-bytes per X second (`byte-dispatch-rate`).
+dispatch rate is in second and it can be configured with `dispatch-rate-period`. Default value of `msg-dispatch-rate` and `byte-dispatch-rate` is -1 which
+disables the throttling.
+
+###### CLI
+
+```
+$ pulsar-admin namespaces set-subscription-dispatch-rate test-tenant/ns1 \
+  --msg-dispatch-rate 1000 \
+  --byte-dispatch-rate 1048576 \
+  --dispatch-rate-period 1
+```
+
+###### REST
+
+```
+{@inject: endpoint|POST|/admin/v2/namespaces/{tenant}/{namespace}/subscriptionDispatchRate|operation/setDispatchRate}
+```
+
+###### Java
+
+```java
+admin.namespaces().setSubscriptionDispatchRate(namespace, new DispatchRate(1000, 1048576, 1))
+```
+
+#### get configured message-rate
+
+It shows configured message-rate for the namespace (topics under this namespace can dispatch this many messages per second)
+
+###### CLI
+
+```
+$ pulsar-admin namespaces get-subscription-dispatch-rate test-tenant/ns1
+```
+
+```json
+{
+  "dispatchThrottlingRatePerTopicInMsg" : 1000,
+  "dispatchThrottlingRatePerTopicInByte" : 1048576,
+  "ratePeriodInSecond" : 1
+}
+```
+
+###### REST
+
+```
+{@inject: endpoint|GET|/admin/v2/namespaces/{tenant}/{namespace}/subscriptionDispatchRate|operation/getDispatchRate}
+```
+
+###### Java
+
+```java
+admin.namespaces().getSubscriptionDispatchRate(namespace)
+```
+
+#### set dispatch throttling for subscription
+
+It sets message dispatch rate for all the replicator between replication clusters under a given namespace.
+Dispatch rate can be restricted by number of message per X seconds (`msg-dispatch-rate`) or by number of message-bytes per X second (`byte-dispatch-rate`).
+dispatch rate is in second and it can be configured with `dispatch-rate-period`. Default value of `msg-dispatch-rate` and `byte-dispatch-rate` is -1 which
+disables the throttling.
+
+###### CLI
+
+```
+$ pulsar-admin namespaces set-replicator-dispatch-rate test-tenant/ns1 \
+  --msg-dispatch-rate 1000 \
+  --byte-dispatch-rate 1048576 \
+  --dispatch-rate-period 1
+```
+
+###### REST
+
+```
+{@inject: endpoint|POST|/admin/v2/namespaces/{tenant}/{namespace}/replicatorDispatchRate|operation/setDispatchRate}
+```
+
+###### Java
+
+```java
+admin.namespaces().setReplicatorDispatchRate(namespace, new DispatchRate(1000, 1048576, 1))
+```
+
+#### get configured message-rate
+
+It shows configured message-rate for the namespace (topics under this namespace can dispatch this many messages per second)
+
+###### CLI
+
+```
+$ pulsar-admin namespaces get-replicator-dispatch-rate test-tenant/ns1
+```
+
+```json
+{
+  "dispatchThrottlingRatePerTopicInMsg" : 1000,
+  "dispatchThrottlingRatePerTopicInByte" : 1048576,
+  "ratePeriodInSecond" : 1
+}
+```
+
+###### REST
+
+```
+{@inject: endpoint|GET|/admin/v2/namespaces/{tenant}/{namespace}/replicatorDispatchRate|operation/getDispatchRate}
+```
+
+###### Java
+
+```java
+admin.namespaces().getReplicatorDispatchRate(namespace)
+```
+
 ### Namespace isolation
 
 Coming soon.