You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/09/21 23:47:35 UTC

[pulsar] branch master updated: support subscription dispatch rate on topic level (#8087)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c508c5  support subscription dispatch rate on topic level (#8087)
4c508c5 is described below

commit 4c508c586fb9dcd08d7e13dfe4aeca19a790f51c
Author: hangc0276 <ha...@163.com>
AuthorDate: Tue Sep 22 07:47:21 2020 +0800

    support subscription dispatch rate on topic level (#8087)
    
    ### Modifications
    Support set subscription dispatch rate on topic level.
    Support get subscription dispatch rate on topic level.
    Support remove subscription dispatch rate on topic level.
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  42 +++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  87 ++++++++++++++
 .../service/persistent/DispatchRateLimiter.java    |  34 +++---
 .../broker/service/persistent/PersistentTopic.java |  16 ++-
 .../broker/admin/TopicPoliciesDisableTest.java     |  21 ++++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 128 +++++++++++++++++++++
 .../org/apache/pulsar/client/admin/Topics.java     |  64 +++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   |  76 ++++++++++++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  59 ++++++++++
 .../pulsar/common/policies/data/TopicPolicies.java |   5 +
 10 files changed, 518 insertions(+), 14 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 749c493..13c592e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3261,6 +3261,48 @@ public class PersistentTopicsBase extends AdminResource {
 
     }
 
+    protected Optional<DispatchRate> internalGetSubscriptionDispatchRate() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        return getTopicPolicies(topicName).map(TopicPolicies::getSubscriptionDispatchRate);
+    }
+
+    protected CompletableFuture<Void> internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        if (dispatchRate == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        TopicPolicies topicPolicies = getTopicPolicies(topicName)
+            .orElseGet(TopicPolicies::new);
+        topicPolicies.setSubscriptionDispatchRate(dispatchRate);
+        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+    }
+
+    protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
+        if (!topicPolicies.isPresent()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        topicPolicies.get().setSubscriptionDispatchRate(null);
+        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+    }
+
+
     protected Optional<Integer> internalGetMaxConsumersPerSubscription() {
         validateAdminAccessForTenant(namespaceName.getTenant());
         validatePoliciesReadOnlyAccess();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 5e93c91..6762538 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2088,6 +2088,93 @@ public class PersistentTopics extends PersistentTopicsBase {
     }
 
     @GET
+    @Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
+    @ApiOperation(value = "Get subscription message dispatch rate configuration for specified topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        try {
+            Optional<DispatchRate> dispatchRate = internalGetSubscriptionDispatchRate();
+            if (!dispatchRate.isPresent()) {
+                asyncResponse.resume(Response.noContent().build());
+            } else {
+                asyncResponse.resume(dispatchRate.get());
+            }
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
+    @ApiOperation(value = "Set subscription message dispatch rate configuration for specified topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void setSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse,
+                                @PathParam("tenant") String tenant,
+                                @PathParam("namespace") String namespace,
+                                @PathParam("topic") @Encoded String encodedTopic,
+                                @ApiParam(value = "Subscription message dispatch rate for the specified topic") DispatchRate dispatchRate) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetSubscriptionDispatchRate(dispatchRate).whenComplete((r, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed to set topic: {} subscription dispatch rate", topicName.getLocalName(), ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed to set topic: {} subscription dispatch rate", topicName.getLocalName());
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                try {
+                    log.info("[{}] Successfully set topic subscription dispatch rate: tenant={}, namespace={}, topic={}, dispatchRate={}",
+                        clientAppId(),
+                        tenant,
+                        namespace,
+                        topicName.getLocalName(),
+                        jsonMapper().writeValueAsString(dispatchRate));
+                } catch (JsonProcessingException ignore) {}
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
+    @ApiOperation(value = "Remove subscription message dispatch rate configuration for specified topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+        @ApiResponse(code = 404, message = "Topic does not exist"),
+        @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+        @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse,
+                                   @PathParam("tenant") String tenant,
+                                   @PathParam("namespace") String namespace,
+                                   @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalRemoveSubscriptionDispatchRate().whenComplete((r, ex) -> {
+            if (ex != null) {
+                log.error("Failed to remove topic: {} subscription dispatch rate", topicName.getLocalName(), ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                log.info("[{}] Successfully remove topic subscription dispatch rate: tenant={}, namespace={}, topic={}",
+                    clientAppId(),
+                    tenant,
+                    namespace,
+                    topicName.getLocalName());
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @GET
     @Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
     @ApiOperation(value = "Get compaction threshold configuration for specified topic.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 4cc711f..29d1107 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -149,7 +149,7 @@ public class DispatchRateLimiter {
      * default broker dispatch-throttling-rate
      */
     public void updateDispatchRate() {
-        Optional<DispatchRate> dispatchRate = getSystemTopicDispatchRate(brokerService, topicName);
+        Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
         if (!dispatchRate.isPresent()) {
             dispatchRate =Optional.ofNullable(getPoliciesDispatchRate(brokerService));
 
@@ -165,29 +165,37 @@ public class DispatchRateLimiter {
     public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies,
             String topicName, Type type) {
         final ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration();
-        if (serviceConfig.isTopicLevelPoliciesEnabled() && type == Type.TOPIC) {
-            Optional<DispatchRate> dispatchRate = getSystemTopicDispatchRate(brokerService, topicName);
-            if (dispatchRate.isPresent()) {
-                return true;
-            }
+        Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
+        if (dispatchRate.isPresent()) {
+            return true;
         }
 
         policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName);
         return isDispatchRateNeeded(serviceConfig, policies, topicName, type);
     }
 
-    public static Optional<DispatchRate> getSystemTopicDispatchRate(BrokerService brokerService, String topicName) {
+    public static Optional<DispatchRate> getTopicPolicyDispatchRate(BrokerService brokerService,
+                                                                    String topicName, Type type) {
         Optional<DispatchRate> dispatchRate = Optional.empty();
         final ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration();
-        if (serviceConfiguration.isTopicLevelPoliciesEnabled()) {
+        if (serviceConfiguration.isSystemTopicEnabled() && serviceConfiguration.isTopicLevelPoliciesEnabled()) {
             try {
-                dispatchRate = Optional.ofNullable(brokerService.pulsar()
-                    .getTopicPoliciesService().getTopicPolicies(TopicName.get(topicName)))
-                    .map(TopicPolicies::getDispatchRate);
-            } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e){
+                switch (type) {
+                    case TOPIC:
+                        dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
+                                .getTopicPolicies(TopicName.get(topicName)))
+                                .map(TopicPolicies::getDispatchRate);
+                        break;
+                    case SUBSCRIPTION:
+                        dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
+                                .getTopicPolicies(TopicName.get(topicName)))
+                                .map(TopicPolicies::getSubscriptionDispatchRate);
+                        break;
+                }
+            } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
                 log.debug("Topic {} policies cache have not init.", topicName);
             } catch (Exception e) {
-                log.debug("[{}] Failed to get topic policies. Exception: {}", topicName, e);
+                log.debug("[{}] Failed to get topic dispatch rate. ", topicName, e);
             }
         }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f83677e..91c8b7d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1873,7 +1873,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         subscriptions.forEach((subName, sub) -> {
             sub.getConsumers().forEach(Consumer::checkPermissions);
             Dispatcher dispatcher = sub.getDispatcher();
-            if (dispatcher != null) {
+            // If the topic-level policy already exists, the namespace-level policy cannot override
+            // the topic-level policy.
+            if (dispatcher != null && (topicPolicies == null || !topicPolicies.isSubscriptionDispatchRateSet())) {
                 dispatcher.getRateLimiter().ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data));
             }
         });
@@ -2389,6 +2391,18 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             }
         });
 
+        subscriptions.forEach((subName, sub) -> {
+            sub.getConsumers().forEach(Consumer::checkPermissions);
+            Dispatcher dispatcher = sub.getDispatcher();
+            if (policies.isSubscriptionDispatchRateSet()) {
+                dispatcher.getRateLimiter().ifPresent(rateLimiter ->
+                        rateLimiter.updateDispatchRate(policies.getSubscriptionDispatchRate()));
+            } else {
+                dispatcher.getRateLimiter().ifPresent(rateLimiter ->
+                        rateLimiter.updateDispatchRate());
+            }
+        });
+
         if (policies.getPublishRate() != null) {
             topicPolicyPublishRate = policies.getPublishRate();
             updateTopicPublishDispatcher();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
index f0f8d79..2309107 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
@@ -154,6 +154,27 @@ public class TopicPoliciesDisableTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testSubscriptionDispatchRateDisabled() throws Exception {
+        DispatchRate dispatchRate = new DispatchRate(1000,
+                1020*1024, 1);
+        log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, testTopic);
+
+        try {
+            admin.topics().setSubscriptionDispatchRate(testTopic, dispatchRate);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+
+        try {
+            admin.topics().getSubscriptionDispatchRate(testTopic);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+    }
+
+    @Test
     public void testCompactionThresholdDisabled() {
         Long compactionThreshold = 10000L;
         log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index c0e7655..803cc36 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -561,6 +561,134 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testGetSetSubscriptionDispatchRate() throws Exception {
+        final String topic = testTopic + UUID.randomUUID();
+        admin.topics().createNonPartitionedTopic(topic);
+        Producer producer = pulsarClient.newProducer().topic(topic).create();
+        producer.close();
+        Thread.sleep(3000);
+
+        DispatchRate dispatchRate = new DispatchRate(1000,
+                1024 * 1024, 1);
+        log.info("Subscription Dispatch Rate: {} will set to the topic: {}", dispatchRate, topic);
+
+        admin.topics().setSubscriptionDispatchRate(topic, dispatchRate);
+        log.info("Subscription dispatch rate set success on topic: {}", topic);
+
+        Thread.sleep(3000);
+
+        String subscriptionName = "test_subscription_rate";
+        Consumer consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();
+        Thread.sleep(3000);
+
+        DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
+                .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
+        Assert.assertNotNull(dispatchRateLimiter);
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.dispatchThrottlingRateInByte);
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.dispatchThrottlingRateInMsg);
+
+
+        DispatchRate getDispatchRate = admin.topics().getSubscriptionDispatchRate(topic);
+        log.info("Subscription dispatch rate: {} get on topic: {}", getDispatchRate, topic);
+        Assert.assertEquals(getDispatchRate, dispatchRate);
+
+        producer.close();
+        admin.topics().delete(topic, true);
+    }
+
+    @Test
+    public void testRemoveSubscriptionDispatchRate() throws Exception {
+        final String topic = testTopic + UUID.randomUUID();
+        admin.topics().createNonPartitionedTopic(topic);
+        Producer producer = pulsarClient.newProducer().topic(topic).create();
+        producer.close();
+        Thread.sleep(3000);
+
+        DispatchRate dispatchRate = new DispatchRate(1000,
+                1024 * 1024, 1);
+        log.info("Subscription Dispatch Rate: {} will set to the topic: {}", dispatchRate, topic);
+
+        admin.topics().setSubscriptionDispatchRate(topic, dispatchRate);
+        log.info("Subscription dispatch rate set success on topic: {}", topic);
+
+        Thread.sleep(3000);
+
+        String subscriptionName = "test_subscription_rate";
+        Consumer consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();
+        Thread.sleep(3000);
+
+        DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
+                .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
+        Assert.assertNotNull(dispatchRateLimiter);
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.dispatchThrottlingRateInByte);
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.dispatchThrottlingRateInMsg);
+
+        DispatchRate getDispatchRate = admin.topics().getSubscriptionDispatchRate(topic);
+        log.info("Subscription dispatch rate: {} get on topic: {}", getDispatchRate, topic);
+
+        // remove subscription dispatch rate
+        admin.topics().removeSubscriptionDispatchRate(topic);
+        Thread.sleep(3000);
+        getDispatchRate = admin.topics().getSubscriptionDispatchRate(topic);
+        log.info("Subscription dispatch rate get on topic is {} after remove", getDispatchRate);
+        Assert.assertNull(getDispatchRate);
+
+        dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
+                .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
+        Assert.assertNotEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.dispatchThrottlingRateInMsg);
+        Assert.assertNotEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.dispatchThrottlingRateInByte);
+
+        producer.close();
+        admin.topics().delete(topic, true);
+    }
+
+    @Test
+    public void testSubscriptionDispatchRatePolicyOverwrittenNamespaceLevel() throws Exception {
+        final String topic = testTopic + UUID.randomUUID();
+        admin.topics().createNonPartitionedTopic(topic);
+        Producer producer = pulsarClient.newProducer().topic(topic).create();
+        producer.close();
+        Thread.sleep(3000);
+
+        // set namespace level subscription dispatch rate
+        DispatchRate namespaceDispatchRate = new DispatchRate(100, 1024 * 1024, 1);
+        admin.namespaces().setSubscriptionDispatchRate(myNamespace, namespaceDispatchRate);
+        Thread.sleep(3000);
+
+        String subscriptionName = "test_subscription_rate";
+        Consumer consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();
+
+        // get subscription dispatch Rate limiter
+        DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
+                .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), namespaceDispatchRate.dispatchThrottlingRateInMsg);
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), namespaceDispatchRate.dispatchThrottlingRateInByte);
+
+        // set topic level subscription dispatch rate
+        DispatchRate topicDispatchRate = new DispatchRate(200, 2 * 1024 * 1024, 1);
+        admin.topics().setSubscriptionDispatchRate(topic, topicDispatchRate);
+        Thread.sleep(3000);
+
+        // get subscription dispatch rate limiter
+        dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get()
+                .getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), topicDispatchRate.dispatchThrottlingRateInByte);
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), topicDispatchRate.dispatchThrottlingRateInMsg);
+
+        // remove topic level subscription dispatch rate limiter
+        admin.topics().removeSubscriptionDispatchRate(topic);
+        Thread.sleep(3000);
+
+        // get subscription dispatch rate limiter
+        dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get()
+                .getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), namespaceDispatchRate.dispatchThrottlingRateInByte);
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), namespaceDispatchRate.dispatchThrottlingRateInMsg);
+
+        admin.topics().delete(topic, true);
+    }
+
+    @Test
     public void testGetSetCompactionThreshold() throws Exception {
         long compactionThreshold = 100000;
         log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic);
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 1994ee2..e039083 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -2008,6 +2008,70 @@ public interface Topics {
     CompletableFuture<Void> removeDispatchRateAsync(String topic) throws PulsarAdminException;
 
     /**
+     * Set subscription-message-dispatch-rate for the topic.
+     * <p/>
+     * Subscriptions under this namespace can dispatch this many messages per second
+     *
+     * @param topic
+     * @param dispatchRate
+     *            number of messages per second
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setSubscriptionDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException;
+
+    /**
+     * Set subscription-message-dispatch-rate for the topic asynchronously.
+     * <p/>
+     * Subscriptions under this namespace can dispatch this many messages per second.
+     *
+     * @param topic
+     * @param dispatchRate
+     *            number of messages per second
+     */
+    CompletableFuture<Void> setSubscriptionDispatchRateAsync(String topic, DispatchRate dispatchRate);
+
+    /**
+     * Get subscription-message-dispatch-rate for the topic.
+     * <p/>
+     * Subscriptions under this namespace can dispatch this many messages per second.
+     *
+     * @param topic
+     * @returns DispatchRate
+     *            number of messages per second
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    DispatchRate getSubscriptionDispatchRate(String topic) throws PulsarAdminException;
+
+    /**
+     * Get subscription-message-dispatch-rate asynchronously.
+     * <p/>
+     * Subscriptions under this namespace can dispatch this many messages per second.
+     *
+     * @param topic
+     * @returns DispatchRate
+     *            number of messages per second
+     */
+    CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String topic);
+
+    /**
+     * Remove subscription-message-dispatch-rate for a topic.
+     * @param topic
+     *            Topic name
+     * @throws PulsarAdminException
+     *            Unexpected error
+     */
+    void removeSubscriptionDispatchRate(String topic) throws PulsarAdminException;
+
+    /**
+     * Remove subscription-message-dispatch-rate for a topic asynchronously.
+     * @param topic
+     *            Topic name
+     */
+    CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String topic);
+
+    /**
      * Get the compactionThreshold for a topic. The maximum number of bytes
      * can have before compaction is triggered. 0 disables.
      * <p/>
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index eb30d9c..9238c07 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -2165,6 +2165,82 @@ public class TopicsImpl extends BaseResource implements Topics {
     }
 
     @Override
+    public DispatchRate getSubscriptionDispatchRate(String topic) throws PulsarAdminException {
+        try {
+            return getSubscriptionDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "subscriptionDispatchRate");
+        final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<DispatchRate>() {
+                    @Override
+                    public void completed(DispatchRate dispatchRate) {
+                        future.complete(dispatchRate);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public void setSubscriptionDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException {
+        try {
+            setSubscriptionDispatchRateAsync(topic, dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setSubscriptionDispatchRateAsync(String topic, DispatchRate dispatchRate) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "subscriptionDispatchRate");
+        return asyncPostRequest(path, Entity.entity(dispatchRate, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void removeSubscriptionDispatchRate(String topic) throws PulsarAdminException {
+        try {
+            removeSubscriptionDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "subscriptionDispatchRate");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public Long getCompactionThreshold(String topic) throws PulsarAdminException {
         try {
             return getCompactionThresholdAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 9649cad..1c6a131 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -133,9 +133,15 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("get-offload-policies", new GetOffloadPolicies());
         jcommander.addCommand("set-offload-policies", new SetOffloadPolicies());
         jcommander.addCommand("remove-offload-policies", new RemoveOffloadPolicies());
+
         jcommander.addCommand("get-dispatch-rate", new GetDispatchRate());
         jcommander.addCommand("set-dispatch-rate", new SetDispatchRate());
         jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate());
+
+        jcommander.addCommand("get-subscription-dispatch-rate", new GetSubscriptionDispatchRate());
+        jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate());
+        jcommander.addCommand("remove-subscription-dispatch-rate", new RemoveSubscriptionDispatchRate());
+
         jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold());
         jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold());
         jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold());
@@ -1478,6 +1484,59 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get subscription message-dispatch-rate for a topic")
+    private class GetSubscriptionDispatchRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(admin.topics().getSubscriptionDispatchRate(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set subscription message-dispatch-rate for a topic")
+    private class SetSubscriptionDispatchRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--msg-dispatch-rate",
+            "-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+        private int msgDispatchRate = -1;
+
+        @Parameter(names = { "--byte-dispatch-rate",
+            "-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+        private long byteDispatchRate = -1;
+
+        @Parameter(names = { "--dispatch-rate-period",
+            "-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false)
+        private int dispatchRatePeriodSec = 1;
+
+        @Parameter(names = { "--relative-to-publish-rate",
+                "-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
+        private boolean relativeToPublishRate = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().setSubscriptionDispatchRate(persistentTopic,
+                    new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove subscription message-dispatch-rate for a topic")
+    private class RemoveSubscriptionDispatchRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().removeSubscriptionDispatchRate(persistentTopic);
+        }
+    }
+
     @Parameters(commandDescription = "Get max number of producers for a topic")
     private class GetMaxProducers extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", required = true)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index fb785b6..0e3440b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -52,6 +52,7 @@ public class TopicPolicies {
     private OffloadPolicies offloadPolicies;
     private InactiveTopicPolicies inactiveTopicPolicies = null;
     private DispatchRate dispatchRate = null;
+    private DispatchRate subscriptionDispatchRate = null;
     private Long compactionThreshold = null;
     private PublishRate publishRate = null;
     private SubscribeRate subscribeRate = null;
@@ -116,6 +117,10 @@ public class TopicPolicies {
         return dispatchRate != null;
     }
 
+    public boolean isSubscriptionDispatchRateSet() {
+        return subscriptionDispatchRate != null;
+    }
+
     public boolean isCompactionThresholdSet() {
         return compactionThreshold != null;
     }