You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2021/01/14 09:47:40 UTC

[pulsar] branch master updated: Support dispatch rate policy at the topic level (#9175)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b65fab  Support dispatch rate policy at the topic level (#9175)
7b65fab is described below

commit 7b65fab167ab776d78fbfd7482ca59fa2276f495
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu Jan 14 17:47:07 2021 +0800

    Support dispatch rate policy at the topic level (#9175)
    
    Fixes #9143
    
    ### Motivation
    The dispatch rate police is supported at the namespace level
    But does not support at the topic level since we supported topic level policy at 2.7.0
    
    ### Modifications
    add API
    
    ### Verifying this change
    ReplicatorRateLimiterTest#testReplicatorRatePriority
    TopicPoliciesTest#testReplicatorRateApi
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 12 ++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 84 ++++++++++++++++++++++
 .../service/persistent/DispatchRateLimiter.java    | 12 +++-
 .../broker/service/persistent/PersistentTopic.java |  8 ++-
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 21 ++++++
 .../broker/service/ReplicatorRateLimiterTest.java  | 74 ++++++++++++++++++-
 .../org/apache/pulsar/client/admin/Topics.java     | 64 +++++++++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 76 ++++++++++++++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      | 10 +++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 57 +++++++++++++++
 .../pulsar/common/policies/data/TopicPolicies.java |  5 ++
 11 files changed, 418 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 8503459..54b98b6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2725,6 +2725,18 @@ public class PersistentTopicsBase extends AdminResource {
         return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
     }
 
+    protected Optional<DispatchRate> internalGetReplicatorDispatchRate() {
+        preValidation();
+        return getTopicPolicies(topicName).map(TopicPolicies::getReplicatorDispatchRate);
+    }
+
+    protected CompletableFuture<Void> internalSetReplicatorDispatchRate(DispatchRate dispatchRate) {
+        preValidation();
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
+        topicPolicies.setReplicatorDispatchRate(dispatchRate);
+        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+    }
+
     private void preValidation() {
         validateAdminAccessForTenant(namespaceName.getTenant());
         validatePoliciesReadOnlyAccess();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 888a478..6217371 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1903,6 +1903,90 @@ public class PersistentTopics extends PersistentTopicsBase {
     }
 
     @GET
+    @Path("/{tenant}/{namespace}/{topic}/replicatorDispatchRate")
+    @ApiOperation(value = "Get replicatorDispatchRate config for specified topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse,
+                                          @PathParam("tenant") String tenant,
+                                          @PathParam("namespace") String namespace,
+                                          @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        try {
+            Optional<DispatchRate> dispatchRate = internalGetReplicatorDispatchRate();
+            if (dispatchRate.isPresent()) {
+                asyncResponse.resume(dispatchRate.get());
+            } else {
+                asyncResponse.resume(Response.noContent().build());
+            }
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/replicatorDispatchRate")
+    @ApiOperation(value = "Set replicatorDispatchRate config for specified topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "Invalid value of replicatorDispatchRate")})
+    public void setReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse,
+                                          @PathParam("tenant") String tenant,
+                                          @PathParam("namespace") String namespace,
+                                          @PathParam("topic") @Encoded String encodedTopic,
+                                          @ApiParam(value = "Replicator dispatch rate of the topic")
+                                                      DispatchRate dispatchRate) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetReplicatorDispatchRate(dispatchRate).whenComplete((r, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Updating replicatorDispatchRate failed", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Updating replicatorDispatchRate failed", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                log.info("[{}] Successfully updated replicatorDispatchRate: namespace={}, topic={}"
+                                + ", replicatorDispatchRate={}"
+                        , clientAppId(), namespaceName, topicName.getLocalName(), dispatchRate);
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/replicatorDispatchRate")
+    @ApiOperation(value = "Remove replicatorDispatchRate config for specified topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse,
+                                               @PathParam("tenant") String tenant,
+                                               @PathParam("namespace") String namespace,
+                                               @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetReplicatorDispatchRate(null).whenComplete((r, ex) -> {
+            if (ex != null) {
+                log.error("Failed to remove replicatorDispatchRate", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                log.info("[{}] Successfully remove replicatorDispatchRate limit: namespace={}, topic={}",
+                        clientAppId(), namespaceName, topicName.getLocalName());
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @GET
     @Path("/{tenant}/{namespace}/{topic}/maxProducers")
     @ApiOperation(value = "Get maxProducers config for specified topic.")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 7fb4d43..285d587 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -143,8 +143,9 @@ public class DispatchRateLimiter {
     }
 
     /**
-     * Update dispatch-throttling-rate. gives first priority to namespace-policy configured dispatch rate else applies
-     * default broker dispatch-throttling-rate
+     * Update dispatch-throttling-rate.
+     * Topic-level has the highest priority, then namespace-level, and finally use dispatch-throttling-rate in
+     * broker-level
      */
     public void updateDispatchRate() {
         Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
@@ -189,6 +190,13 @@ public class DispatchRateLimiter {
                                 .getTopicPolicies(TopicName.get(topicName)))
                                 .map(TopicPolicies::getSubscriptionDispatchRate);
                         break;
+                    case REPLICATOR:
+                        dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
+                                .getTopicPolicies(TopicName.get(topicName)))
+                                .map(TopicPolicies::getReplicatorDispatchRate);
+                        break;
+                    default:
+                        break;
                 }
             } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
                 log.debug("Topic {} policies cache have not init.", topicName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9f8a447..0e0cc8e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -497,11 +497,13 @@ public class PersistentTopic extends AbstractTopic
         });
     }
 
+    @Override
     protected CompletableFuture<Long> incrementTopicEpoch(Optional<Long> currentEpoch) {
         long newEpoch = currentEpoch.orElse(-1L) + 1;
         return setTopicEpoch(newEpoch);
     }
 
+    @Override
     protected CompletableFuture<Long> setTopicEpoch(long newEpoch) {
         CompletableFuture<Long> future = new CompletableFuture<>();
         ledger.asyncSetProperty(TOPIC_EPOCH_PROPERTY_NAME, String.valueOf(newEpoch), new UpdatePropertiesCallback() {
@@ -2059,7 +2061,7 @@ public class PersistentTopic extends AbstractTopic
             }
         });
         replicators.forEach((name, replicator) ->
-            replicator.getRateLimiter().ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data))
+                replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate)
         );
         checkMessageExpiry();
         CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
@@ -2662,10 +2664,12 @@ public class PersistentTopic extends AbstractTopic
         }
 
         initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
-        if (this.subscribeRateLimiter.isPresent() && policies != null) {
+        if (this.subscribeRateLimiter.isPresent()) {
             subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
                 subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
         }
+        replicators.forEach((name, replicator) -> replicator.getRateLimiter()
+                .ifPresent(DispatchRateLimiter::updateDispatchRate));
     }
 
     private Optional<Policies> getNamespacePolicies() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index f6e9eda..e8d09d7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -1387,4 +1387,25 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
             c.close();
         }
     }
+
+    @Test(timeOut = 30000)
+    public void testReplicatorRateApi() throws Exception {
+        final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
+        // init cache
+        pulsarClient.newProducer().topic(topic).create().close();
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+
+        assertNull(admin.topics().getReplicatorDispatchRate(topic));
+
+        DispatchRate dispatchRate = new DispatchRate(100,200L,10);
+        admin.topics().setReplicatorDispatchRate(topic, dispatchRate);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
+                -> assertEquals(admin.topics().getReplicatorDispatchRate(topic), dispatchRate));
+
+        admin.topics().removeReplicatorDispatchRate(topic);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
+                -> assertNull(admin.topics().getReplicatorDispatchRate(topic)));
+    }
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
index 4c04ccf..9e68cc5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
@@ -31,7 +31,9 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -41,6 +43,10 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
 /**
  * Starts 3 brokers that are in 3 different clusters
  */
@@ -74,6 +80,73 @@ public class ReplicatorRateLimiterTest extends ReplicatorTestBase {
         return new Object[][] { { DispatchRateType.messageRate }, { DispatchRateType.byteRate } };
     }
 
+    @Test
+    public void testReplicatorRatePriority() throws Exception {
+        shutdown();
+        config1.setSystemTopicEnabled(true);
+        config1.setTopicLevelPoliciesEnabled(true);
+        config1.setDispatchThrottlingRatePerReplicatorInMsg(100);
+        config1.setDispatchThrottlingRatePerReplicatorInByte(200L);
+        setup();
+
+        final String namespace = "pulsar/replicatorchange-" + System.currentTimeMillis();
+        final String topicName = "persistent://" + namespace + "/ratechange";
+
+        admin1.namespaces().createNamespace(namespace);
+        // set 2 clusters, there will be 1 replicator in each topic
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+        @Cleanup
+        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+                .statsInterval(0, TimeUnit.SECONDS).build();
+        client1.newProducer().topic(topicName).create().close();
+        PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+        Awaitility.await().atMost(3, TimeUnit.SECONDS)
+                .until(() -> pulsar1.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName)));
+
+        //use broker-level by default
+        assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 100);
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 200L);
+
+        //set namespace-level policy, which should take effect
+        DispatchRate nsDispatchRate = new DispatchRate(50, 60L, 70);
+        admin1.namespaces().setReplicatorDispatchRate(namespace, nsDispatchRate);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), nsDispatchRate));
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 50);
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 60L);
+
+        //set topic-level policy, which should take effect
+        DispatchRate topicRate = new DispatchRate(10, 20L, 30);
+        admin1.topics().setReplicatorDispatchRate(topicName, topicRate);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
+                assertEquals(admin1.topics().getReplicatorDispatchRate(topicName), topicRate));
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10);
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L);
+
+        //Set the namespace-level policy, which should not take effect
+        DispatchRate nsDispatchRate2 = new DispatchRate(500, 600L, 700);
+        admin1.namespaces().setReplicatorDispatchRate(namespace, nsDispatchRate2);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), nsDispatchRate2));
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L);
+
+        //remove topic-level policy, namespace-level should take effect
+        admin1.topics().removeReplicatorDispatchRate(topicName);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
+                assertNull(admin1.topics().getReplicatorDispatchRate(topicName)));
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 500);
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
+                600L);
+
+        //remove namespace-level policy, broker-level should take effect
+        admin1.namespaces().setReplicatorDispatchRate(namespace, null);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
+                assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 100));
+        assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
+                200L);
+    }
+
     /**
      * verifies dispatch rate for replicators get changed once namespace policies changed.
      *
@@ -103,7 +176,6 @@ public class ReplicatorRateLimiterTest extends ReplicatorTestBase {
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
         producer.close();
-
         PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
 
         // 1. default replicator throttling not configured
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 3c68986..3f275d4 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -2162,6 +2162,70 @@ public interface Topics {
     CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String topic);
 
     /**
+     * Set replicatorDispatchRate for the topic.
+     * <p/>
+     * Replicator dispatch rate under this topic can dispatch this many messages per second
+     *
+     * @param topic
+     * @param dispatchRate
+     *            number of messages per second
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setReplicatorDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException;
+
+    /**
+     * Set replicatorDispatchRate for the topic asynchronously.
+     * <p/>
+     * Replicator dispatch rate under this topic can dispatch this many messages per second.
+     *
+     * @param topic
+     * @param dispatchRate
+     *            number of messages per second
+     */
+    CompletableFuture<Void> setReplicatorDispatchRateAsync(String topic, DispatchRate dispatchRate);
+
+    /**
+     * Get replicatorDispatchRate for the topic.
+     * <p/>
+     * Replicator dispatch rate under this topic can dispatch this many messages per second.
+     *
+     * @param topic
+     * @returns DispatchRate
+     *            number of messages per second
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    DispatchRate getReplicatorDispatchRate(String topic) throws PulsarAdminException;
+
+    /**
+     * Get replicatorDispatchRate asynchronously.
+     * <p/>
+     * Replicator dispatch rate under this topic can dispatch this many messages per second.
+     *
+     * @param topic
+     * @returns DispatchRate
+     *            number of messages per second
+     */
+    CompletableFuture<DispatchRate> getReplicatorDispatchRateAsync(String topic);
+
+    /**
+     * Remove replicatorDispatchRate for a topic.
+     * @param topic
+     *            Topic name
+     * @throws PulsarAdminException
+     *            Unexpected error
+     */
+    void removeReplicatorDispatchRate(String topic) throws PulsarAdminException;
+
+    /**
+     * Remove replicatorDispatchRate for a topic asynchronously.
+     * @param topic
+     *            Topic name
+     */
+    CompletableFuture<Void> removeReplicatorDispatchRateAsync(String topic);
+
+    /**
      * Get the compactionThreshold for a topic. The maximum number of bytes
      * can have before compaction is triggered. 0 disables.
      * <p/>
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 617ed32..eda3443 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -2939,6 +2939,82 @@ public class TopicsImpl extends BaseResource implements Topics {
     }
 
     @Override
+    public DispatchRate getReplicatorDispatchRate(String topic) throws PulsarAdminException {
+        try {
+            return getReplicatorDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<DispatchRate> getReplicatorDispatchRateAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "replicatorDispatchRate");
+        final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<DispatchRate>() {
+                    @Override
+                    public void completed(DispatchRate dispatchRate) {
+                        future.complete(dispatchRate);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public void setReplicatorDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException {
+        try {
+            setReplicatorDispatchRateAsync(topic, dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setReplicatorDispatchRateAsync(String topic, DispatchRate dispatchRate) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "replicatorDispatchRate");
+        return asyncPostRequest(path, Entity.entity(dispatchRate, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void removeReplicatorDispatchRate(String topic) throws PulsarAdminException {
+        try {
+            removeReplicatorDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeReplicatorDispatchRateAsync(String topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "replicatorDispatchRate");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public SubscribeRate getSubscribeRate(String topic) throws PulsarAdminException {
         try {
             return getSubscribeRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 210d9d7..477f032 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -770,6 +770,16 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("disable-deduplication persistent://myprop/clust/ns1/ds1"));
         verify(mockTopics).enableDeduplication("persistent://myprop/clust/ns1/ds1", false);
 
+        cmdTopics.run(split("get-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopics).getReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1");
+
+        cmdTopics.run(split("set-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1 -md 10 -bd 11 -dt 12"));
+        verify(mockTopics).setReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1",
+                new DispatchRate(10,11,12));
+
+        cmdTopics.run(split("remove-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopics).removeReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1");
+
         cmdTopics.run(split("get-deduplication-enabled persistent://myprop/clust/ns1/ds1"));
         verify(mockTopics).getDeduplicationEnabled("persistent://myprop/clust/ns1/ds1");
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 8488e17..fff4a59 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -146,6 +146,10 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate());
         jcommander.addCommand("remove-subscription-dispatch-rate", new RemoveSubscriptionDispatchRate());
 
+        jcommander.addCommand("get-replicator-dispatch-rate", new GetReplicatorDispatchRate());
+        jcommander.addCommand("set-replicator-dispatch-rate", new SetReplicatorDispatchRate());
+        jcommander.addCommand("remove-replicator-dispatch-rate", new RemoveReplicatorDispatchRate());
+
         jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold());
         jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold());
         jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold());
@@ -1682,6 +1686,59 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get replicator message-dispatch-rate for a topic")
+    private class GetReplicatorDispatchRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String topic = validatePersistentTopic(params);
+            print(admin.topics().getReplicatorDispatchRate(topic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set replicator message-dispatch-rate for a topic")
+    private class SetReplicatorDispatchRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--msg-dispatch-rate",
+            "-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+        private int msgDispatchRate = -1;
+
+        @Parameter(names = { "--byte-dispatch-rate",
+            "-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+        private long byteDispatchRate = -1;
+
+        @Parameter(names = { "--dispatch-rate-period",
+            "-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false)
+        private int dispatchRatePeriodSec = 1;
+
+        @Parameter(names = { "--relative-to-publish-rate",
+                "-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
+        private boolean relativeToPublishRate = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().setReplicatorDispatchRate(persistentTopic,
+                    new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove replicator message-dispatch-rate for a topic")
+    private class RemoveReplicatorDispatchRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().removeReplicatorDispatchRate(persistentTopic);
+        }
+    }
+
     @Parameters(commandDescription = "Get max number of producers for a topic")
     private class GetMaxProducers extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", required = true)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index a6c7ebf..3c0c78e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -57,6 +57,11 @@ public class TopicPolicies {
     private Integer deduplicationSnapshotIntervalSeconds = null;
     private Integer maxMessageSize = null;
     private Integer maxSubscriptionsPerTopic = null;
+    private DispatchRate replicatorDispatchRate = null;
+
+    public boolean isReplicatorDispatchRateSet() {
+        return replicatorDispatchRate != null;
+    }
 
     public boolean isMaxSubscriptionsPerTopicSet() {
         return maxSubscriptionsPerTopic != null;