You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/09/21 03:36:55 UTC

[pulsar] branch master updated: Support subscribe rate on topic level (#7991)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new afe6413  Support subscribe rate on topic level (#7991)
afe6413 is described below

commit afe641350838b0a215b53ca5fbcb7990d7b45857
Author: hangc0276 <ha...@163.com>
AuthorDate: Mon Sep 21 11:34:43 2020 +0800

    Support subscribe rate on topic level (#7991)
    
    Support set subscribe rate on topic level.
    Support get subscribe rate on topic level.
    Support remove subscribe rate on topic level.
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  43 +++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  88 ++++++++++
 .../broker/service/persistent/PersistentTopic.java |  32 +++-
 .../service/persistent/SubscribeRateLimiter.java   |  76 ++++++---
 .../broker/admin/TopicPoliciesDisableTest.java     |  21 +++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 179 +++++++++++++++++++++
 .../org/apache/pulsar/client/admin/Topics.java     |  64 +++++++-
 .../pulsar/client/admin/internal/TopicsImpl.java   |  78 +++++++++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  51 ++++++
 .../pulsar/common/policies/data/TopicPolicies.java |   5 +
 10 files changed, 614 insertions(+), 23 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9a81e9a..749c493 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin.impl;
 
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import static org.apache.pulsar.common.util.Codec.decode;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -3386,4 +3387,46 @@ public class PersistentTopicsBase extends AdminResource {
         return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
     }
 
+    protected Optional<SubscribeRate> internalGetSubscribeRate() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+
+        checkTopicLevelPolicyEnable();
+        return getTopicPolicies(topicName).map(TopicPolicies::getSubscribeRate);
+    }
+
+    protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate subscribeRate) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        if (subscribeRate == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        TopicPolicies topicPolicies = getTopicPolicies(topicName)
+                .orElseGet(TopicPolicies::new);
+        topicPolicies.setSubscribeRate(subscribeRate);
+        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+    }
+
+    protected CompletableFuture<Void> internalRemoveSubscribeRate() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
+        if (!topicPolicies.isPresent()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        topicPolicies.get().setSubscribeRate(null);
+        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+    }
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 68a12a5..5e93c91 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin.v2;
 
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import static org.apache.pulsar.common.util.Codec.decode;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -2347,5 +2348,92 @@ public class PersistentTopics extends PersistentTopicsBase {
         });
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/subscribeRate")
+    @ApiOperation(value = "Get subscribe rate configuration for specified topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getSubscribeRate(@Suspended final AsyncResponse asyncResponse,
+                                @PathParam("tenant") String tenant,
+                                @PathParam("namespace") String namespace,
+                                @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        try {
+            Optional<SubscribeRate> subscribeRate = internalGetSubscribeRate();
+            if (!subscribeRate.isPresent()) {
+                asyncResponse.resume(Response.noContent().build());
+            } else {
+                asyncResponse.resume(subscribeRate.get());
+            }
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/subscribeRate")
+    @ApiOperation(value = "Set subscribe rate configuration for specified topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void setSubscribeRate(@Suspended final AsyncResponse asyncResponse,
+                                @PathParam("tenant") String tenant,
+                                @PathParam("namespace") String namespace,
+                                @PathParam("topic") @Encoded String encodedTopic,
+                                @ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetSubscribeRate(subscribeRate).whenComplete((r, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed to set topic {} subscribe rate", topicName.getLocalName(), ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed to set topic subscribe rate");
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                try {
+                    log.info("[{}] Successfully set topic subscribe rate: tenant={}, namespace={}, topic={}, subscribeRate={}",
+                            clientAppId(),
+                            tenant,
+                            namespace,
+                            topicName.getLocalName(),
+                            jsonMapper().writeValueAsString(subscribeRate));
+                } catch (JsonProcessingException ignore) {}
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/subscribeRate")
+    @ApiOperation(value = "Remove subscribe rate configuration for specified topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeSubscribeRate(@Suspended final AsyncResponse asyncResponse,
+                                   @PathParam("tenant") String tenant,
+                                   @PathParam("namespace") String namespace,
+                                   @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalRemoveSubscribeRate().whenComplete((r, ex) -> {
+            if (ex != null) {
+                log.error("Failed to remove topic {} subscribe rate ", topicName.getLocalName(), ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                log.info("[{}] Successfully remove topic subscribe rate: tenant={}, namespace={}, topic={}",
+                        clientAppId(),
+                        tenant,
+                        namespace,
+                        topicName.getLocalName());
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 292e070..f83677e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -230,7 +230,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         this.backloggedCursorThresholdEntries = brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
 
         initializeDispatchRateLimiterIfNeeded(Optional.empty());
-        brokerService.getPulsar().getTopicPoliciesService().registerListener(TopicName.get(topic), this);
+        registerTopicPolicyListener();
+
 
         this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
 
@@ -581,6 +582,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 future.completeExceptionally(new NotAllowedException("Subscribe limited by subscribe rate limit per consumer."));
                 return future;
             }
+
         }
 
         lock.readLock().lock();
@@ -2403,6 +2405,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
                     , cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled());
         }
+
+        initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
+        if (this.subscribeRateLimiter.isPresent() && policies != null) {
+            subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
+                subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
+        }
     }
 
     private Optional<Policies> getNamespacePolicies(){
@@ -2423,6 +2431,15 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         }
     }
 
+    private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies> policies) {
+        synchronized (subscribeRateLimiter) {
+            if (!subscribeRateLimiter.isPresent() && policies.isPresent() &&
+                    policies.get().getSubscribeRate() != null) {
+                this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
+            }
+        }
+    }
+
     private PersistentTopic getPersistentTopic() {
         return this;
     }
@@ -2454,6 +2471,19 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         }
     }
 
+    private void registerTopicPolicyListener() {
+        if (brokerService.pulsar().getConfig().isSystemTopicEnabled() &&
+                brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+            TopicName topicName = TopicName.get(topic);
+            TopicName cloneTopicName = topicName;
+            if (topicName.isPartitioned()) {
+                cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
+            }
+
+            brokerService.getPulsar().getTopicPoliciesService().registerListener(cloneTopicName, this);
+        }
+    }
+
     @VisibleForTesting
     public MessageDeduplication getMessageDeduplication() {
         return messageDeduplication;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index ee07250..30f13fc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -23,8 +23,10 @@ import com.google.common.base.MoreObjects;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +52,17 @@ public class SubscribeRateLimiter {
         this.brokerService = topic.getBrokerService();
         subscribeRateLimiter = new ConcurrentHashMap<>();
         this.executorService = brokerService.pulsar().getExecutor();
-        this.subscribeRate = getPoliciesSubscribeRate();
+        // get subscribeRate from topic level policies
+        this.subscribeRate = Optional.ofNullable(brokerService.getTopicPolicies(TopicName.get(this.topicName)))
+                .map(TopicPolicies::getSubscribeRate)
+                .orElse(null);
+
+        // subscribeRate of topic level policies not set, get from zookeeper
+        if (this.subscribeRate == null) {
+            this.subscribeRate = getPoliciesSubscribeRate();
+        }
+
+        // get subscribeRate from broker.conf
         if (this.subscribeRate == null) {
             this.subscribeRate = new SubscribeRate(brokerService.pulsar().getConfiguration().getSubscribeThrottlingRatePerConsumer(),
                     brokerService.pulsar().getConfiguration().getSubscribeRatePeriodPerConsumerInSecond());
@@ -95,9 +107,10 @@ public class SubscribeRateLimiter {
      * default broker subscribe-throttling-rate
      */
     private synchronized void addSubscribeLimiterIfAbsent(ConsumerIdentifier consumerIdentifier) {
-        if (subscribeRateLimiter.get(consumerIdentifier) != null) {
+        if (subscribeRateLimiter.get(consumerIdentifier) != null || !isSubscribeRateEnabled(this.subscribeRate)) {
             return;
         }
+
         updateSubscribeRate(consumerIdentifier, this.subscribeRate);
     }
 
@@ -134,31 +147,52 @@ public class SubscribeRateLimiter {
     }
 
     public void onPoliciesUpdate(Policies data) {
+        // if subscribe rate is set on topic policy, skip subscribe rate update
+        SubscribeRate subscribeRate = Optional.ofNullable(brokerService.getTopicPolicies(TopicName.get(topicName)))
+                .map(TopicPolicies::getSubscribeRate)
+                .orElse(null);
+        if (subscribeRate != null) {
+            return;
+        }
 
         String cluster = brokerService.pulsar().getConfiguration().getClusterName();
 
-        SubscribeRate subscribeRate = data.clusterSubscribeRate.get(cluster);
+        subscribeRate = data.clusterSubscribeRate.get(cluster);
 
-        // update dispatch-rate only if it's configured in policies else ignore
-        if (subscribeRate != null) {
-            final SubscribeRate newSubscribeRate = new SubscribeRate(
-                    brokerService.pulsar().getConfiguration().getSubscribeThrottlingRatePerConsumer(),
-                    brokerService.pulsar().getConfiguration().getSubscribeRatePeriodPerConsumerInSecond()
-                    );
-            // if policy-throttling rate is disabled and cluster-throttling is enabled then apply
-            // cluster-throttling rate
-            if (!isSubscribeRateEnabled(subscribeRate) && isSubscribeRateEnabled(newSubscribeRate)) {
-                subscribeRate = newSubscribeRate;
-            }
-            this.subscribeRate = subscribeRate;
-            stopResetTask();
-            for (ConsumerIdentifier consumerIdentifier : this.subscribeRateLimiter.keySet()) {
+        onSubscribeRateUpdate(subscribeRate);
+
+    }
+
+    public void onSubscribeRateUpdate(SubscribeRate subscribeRate) {
+        final SubscribeRate namespacePolicySubscribeRate = getPoliciesSubscribeRate();
+        final SubscribeRate newSubscribeRate = new SubscribeRate(
+                brokerService.pulsar().getConfiguration().getSubscribeThrottlingRatePerConsumer(),
+                brokerService.pulsar().getConfiguration().getSubscribeRatePeriodPerConsumerInSecond()
+                );
+
+        // if policy-throttling rate is disabled and cluster-throttling is enabled then apply
+        // cluster-throttling rate
+        // if topic policy-throttling rate is disabled
+        if (!isSubscribeRateEnabled(subscribeRate) && isSubscribeRateEnabled(namespacePolicySubscribeRate)) {
+            subscribeRate = namespacePolicySubscribeRate;
+        }
+
+        if (!isSubscribeRateEnabled(subscribeRate) && !isSubscribeRateEnabled(namespacePolicySubscribeRate)
+                && isSubscribeRateEnabled(newSubscribeRate)) {
+            subscribeRate = newSubscribeRate;
+        }
+        this.subscribeRate = subscribeRate;
+        stopResetTask();
+        for (ConsumerIdentifier consumerIdentifier : this.subscribeRateLimiter.keySet()) {
+            if (!isSubscribeRateEnabled(this.subscribeRate)) {
+                removeSubscribeLimiter(consumerIdentifier);
+            } else {
                 updateSubscribeRate(consumerIdentifier, subscribeRate);
             }
-            if (isSubscribeRateEnabled(this.subscribeRate)) {
-                this.resetTask = createTask();
-                log.info("[{}] configured subscribe-dispatch rate at broker {}", this.topicName, subscribeRate);
-            }
+        }
+        if (isSubscribeRateEnabled(this.subscribeRate)) {
+            this.resetTask = createTask();
+            log.info("[{}] configured subscribe-dispatch rate at broker {}", this.topicName, subscribeRate);
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
index fec0395..f0f8d79 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -254,4 +255,24 @@ public class TopicPoliciesDisableTest extends MockedPulsarServiceBaseTest {
             Assert.assertEquals(e.getStatusCode(), 405);
         }
     }
+
+    @Test
+    public void testSubscribeRateDisabled() throws Exception {
+        SubscribeRate subscribeRate = new SubscribeRate(10, 30);
+        log.info("Subscribe Rate: {} will set to the topic: {}", subscribeRate, testTopic);
+
+        try {
+            admin.topics().setSubscribeRate(testTopic, subscribeRate);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+
+        try {
+            admin.topics().getSubscribeRate(testTopic);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index ef6eb40..c0e7655 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import static org.testng.Assert.assertEquals;
 
 import com.google.common.collect.Sets;
@@ -788,4 +790,181 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
         admin.topics().deletePartitionedTopic(persistenceTopic, true);
         admin.topics().deletePartitionedTopic(testTopic, true);
     }
+
+    @Test
+    public void testGetSetSubscribeRate() throws Exception {
+        admin.topics().createPartitionedTopic(persistenceTopic, 2);
+        Producer producer = pulsarClient.newProducer().topic(testTopic).create();
+        producer.close();
+
+        SubscribeRate subscribeRate = new SubscribeRate(1, 30);
+        log.info("Subscribe Rate: {} will be set to the namespace: {}", subscribeRate, myNamespace);
+        admin.namespaces().setSubscribeRate(myNamespace, subscribeRate);
+        log.info("Subscribe Rate set success on namespace: {}", myNamespace);
+        Thread.sleep(3000);
+
+        subscribeRate =  new SubscribeRate(2, 30);
+        log.info("Subscribe Rate: {} will set to the topic: {}", subscribeRate, persistenceTopic);
+        admin.topics().setSubscribeRate(persistenceTopic, subscribeRate);
+        log.info("Subscribe Rate set success on topic: {}", persistenceTopic);
+
+        Thread.sleep(3000);
+
+        PulsarClient pulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);
+        PulsarClient pulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);
+        PulsarClient pulsarClient3 = newPulsarClient(lookupUrl.toString(), 0);
+
+        Consumer consumer1 = null;
+        Consumer consumer2 = null;
+        Consumer consumer3 = null;
+
+        try {
+            consumer1 = pulsarClient1.newConsumer().subscriptionName("sub1")
+                    .topic(persistenceTopic).consumerName("test").subscribe();
+            Assert.assertNotNull(consumer1);
+            consumer1.close();
+            pulsarClient1.shutdown();
+        } catch (PulsarClientException e) {
+            Assert.fail();
+        }
+
+        try {
+            consumer2 = pulsarClient2.newConsumer().subscriptionName("sub1")
+                    .topic(persistenceTopic).consumerName("test").subscribe();
+            Assert.assertNotNull(consumer2);
+            consumer2.close();
+            pulsarClient2.shutdown();
+        } catch (PulsarClientException e) {
+            Assert.fail();
+        }
+
+        try {
+            consumer3 = pulsarClient3.newConsumer().subscriptionName("sub1")
+                    .topic(persistenceTopic).consumerName("test").subscribe();
+            Assert.fail();
+        } catch (PulsarClientException e) {
+            log.info("subscribe rate reached max subscribe rate limit");
+        }
+
+        Assert.assertNull(consumer3);
+        pulsarClient3.shutdown();
+
+        SubscribeRate getSubscribeRate = admin.topics().getSubscribeRate(persistenceTopic);
+        log.info("Subscribe Rate: {} get on topic: {}", getSubscribeRate, persistenceTopic);
+        Assert.assertEquals(getSubscribeRate, subscribeRate);
+
+        admin.topics().deletePartitionedTopic(testTopic, true);
+        admin.topics().deletePartitionedTopic(persistenceTopic, true);
+    }
+
+    @Test
+    public void testRemoveSubscribeRate() throws Exception {
+        admin.topics().createPartitionedTopic(persistenceTopic, 2);
+        Producer producer = pulsarClient.newProducer().topic(testTopic).create();
+        producer.close();
+
+        SubscribeRate subscribeRate = new SubscribeRate(2, 30);
+        log.info("Subscribe Rate: {} will set to the topic: {}", subscribeRate, persistenceTopic);
+        admin.topics().setSubscribeRate(persistenceTopic, subscribeRate);
+        log.info("Subscribe Rate set success on topic: {}", persistenceTopic);
+
+        Thread.sleep(3000);
+
+        PulsarClient pulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);
+        PulsarClient pulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);
+        PulsarClient pulsarClient3 = newPulsarClient(lookupUrl.toString(), 0);
+
+        Consumer consumer1 = null;
+        Consumer consumer2 = null;
+        Consumer consumer3 = null;
+
+        try {
+            consumer1 = pulsarClient1.newConsumer().subscriptionName("sub1")
+                    .topic(persistenceTopic).consumerName("test").subscribe();
+            Assert.assertNotNull(consumer1);
+            consumer1.close();
+            pulsarClient1.shutdown();
+        } catch (PulsarClientException e) {
+            Assert.fail();
+        }
+
+        try {
+            consumer2 = pulsarClient2.newConsumer().subscriptionName("sub1")
+                    .topic(persistenceTopic).consumerName("test").subscribe();
+            Assert.assertNotNull(consumer2);
+            consumer2.close();
+            pulsarClient2.shutdown();
+        } catch (PulsarClientException e) {
+            Assert.fail();
+        }
+
+        try {
+            consumer3 = pulsarClient3.newConsumer().subscriptionName("sub1")
+                    .topic(persistenceTopic).consumerName("test").subscribe();
+            Assert.fail();
+        } catch (PulsarClientException e) {
+            log.info("subscribe rate reached max subscribe rate limit");
+        }
+        Assert.assertNull(consumer3);
+
+        SubscribeRate getSubscribeRate = admin.topics().getSubscribeRate(persistenceTopic);
+        log.info("Subscribe Rate: {} get on topic: {}", getSubscribeRate, persistenceTopic);
+        Assert.assertEquals(getSubscribeRate, subscribeRate);
+
+        admin.topics().removeSubscribeRate(persistenceTopic);
+        Thread.sleep(3000);
+        log.info("Subscribe Rate get on topic: {} after remove", getSubscribeRate, persistenceTopic);
+        getSubscribeRate = admin.topics().getSubscribeRate(persistenceTopic);
+        Assert.assertNull(getSubscribeRate);
+
+        PulsarClient pulsarClient4 = newPulsarClient(lookupUrl.toString(), 0);
+        PulsarClient pulsarClient5 = newPulsarClient(lookupUrl.toString(), 0);
+        PulsarClient pulsarClient6 = newPulsarClient(lookupUrl.toString(), 0);
+
+        Consumer consumer4 = null;
+        Consumer consumer5 = null;
+        Consumer consumer6 = null;
+
+        try {
+            consumer3 = pulsarClient3.newConsumer().subscriptionName("sub2")
+                    .topic(persistenceTopic).consumerName("test").subscribe();
+            Assert.assertNotNull(consumer3);
+            consumer3.close();
+            pulsarClient3.shutdown();
+        } catch (PulsarClientException e) {
+            Assert.fail();
+        }
+
+        try {
+            consumer4 = pulsarClient4.newConsumer().subscriptionName("sub2")
+                    .topic(persistenceTopic).consumerName("test").subscribe();
+            Assert.assertNotNull(consumer4);
+            consumer4.close();
+            pulsarClient4.shutdown();
+        } catch (PulsarClientException e) {
+            Assert.fail();
+        }
+        try {
+            consumer5 = pulsarClient5.newConsumer().subscriptionName("sub2")
+                    .topic(persistenceTopic).consumerName("test").subscribe();
+            Assert.assertNotNull(consumer5);
+            consumer5.close();
+            pulsarClient5.shutdown();
+        } catch (PulsarClientException e) {
+            Assert.fail();
+        }
+
+        try {
+            consumer6 = pulsarClient6.newConsumer().subscriptionName("sub2")
+                    .topic(persistenceTopic).consumerName("test").subscribe();
+            Assert.assertNotNull(consumer6);
+            consumer6.close();
+            pulsarClient6.shutdown();
+        } catch (PulsarClientException e) {
+            Assert.fail();
+        }
+
+        admin.topics().deletePartitionedTopic(persistenceTopic, true);
+        admin.topics().deletePartitionedTopic(testTopic, true);
+    }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 282a3a4..1994ee2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TopicStats;
 /**
  * Admin interface for Topics management.
@@ -2301,7 +2302,6 @@ public interface Topics {
      */
     CompletableFuture<Void> removeMaxProducersAsync(String topic);
 
-
     /**
      * Get the max number of consumer for specified topic.
      *
@@ -2353,4 +2353,66 @@ public interface Topics {
      * @param topic Topic name
      */
     CompletableFuture<Void> removeMaxConsumersAsync(String topic);
+
+    /**
+     * Set topic-subscribe-rate (topic will limit by subscribeRate).
+     *
+     * @param topic
+     * @param subscribeRate
+     *            consumer subscribe limit by this subscribeRate
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setSubscribeRate(String topic, SubscribeRate subscribeRate) throws PulsarAdminException;
+
+    /**
+     * Set topic-subscribe-rate (topics will limit by subscribeRate) asynchronously.
+     *
+     * @param topic
+     * @param subscribeRate
+     *            consumer subscribe limit by this subscribeRate
+     */
+    CompletableFuture<Void> setSubscribeRateAsync(String topic, SubscribeRate subscribeRate);
+
+    /**
+     * Get topic-subscribe-rate (topics allow subscribe times per consumer in a period).
+     *
+     * @param topic
+     * @returns subscribeRate
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    SubscribeRate getSubscribeRate(String topic) throws PulsarAdminException;
+
+    /**
+     * Get topic-subscribe-rate asynchronously.
+     * <p/>
+     * Topic allow subscribe times per consumer in a period.
+     *
+     * @param topic
+     * @returns subscribeRate
+     */
+    CompletableFuture<SubscribeRate> getSubscribeRateAsync(String topic);
+
+    /**
+     * Remove topic-subscribe-rate.
+     * <p/>
+     * Remove topic subscribe rate
+     *
+     * @param topic
+     * @throws PulsarAdminException
+     *              unexpected error
+     */
+    void removeSubscribeRate(String topic) throws PulsarAdminException;
+
+    /**
+     * Remove topic-subscribe-rate asynchronously.
+     * <p/>
+     * Remove topic subscribe rate
+     *
+     * @param topic
+     * @throws PulsarAdminException
+     *              unexpected error
+     */
+    CompletableFuture<Void> removeSubscribeRateAsync(String topic) throws PulsarAdminException;
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index ca4e786..eb30d9c 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -79,6 +79,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.Codec;
@@ -2545,5 +2546,82 @@ public class TopicsImpl extends BaseResource implements Topics {
         return asyncDeleteRequest(path);
     }
 
+
+    @Override
+    public SubscribeRate getSubscribeRate(String topic) throws PulsarAdminException {
+        try {
+            return getSubscribeRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<SubscribeRate> getSubscribeRateAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "subscribeRate");
+        final CompletableFuture<SubscribeRate> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<SubscribeRate>() {
+                    @Override
+                    public void completed(SubscribeRate subscribeRate) {
+                        future.complete(subscribeRate);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public void setSubscribeRate(String topic, SubscribeRate subscribeRate) throws PulsarAdminException {
+        try {
+            setSubscribeRateAsync(topic, subscribeRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setSubscribeRateAsync(String topic, SubscribeRate subscribeRate) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "subscribeRate");
+        return asyncPostRequest(path, Entity.entity(subscribeRate, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void removeSubscribeRate(String topic) throws PulsarAdminException {
+        try {
+            removeSubscribeRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeSubscribeRateAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "subscribeRate");
+        return asyncDeleteRequest(path);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 54bf2e7..9649cad 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -57,6 +57,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.util.RelativeTimeUtil;
 
 @Parameters(commandDescription = "Operations on persistent topics")
@@ -159,9 +160,14 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("get-inactive-topic-policies", new GetInactiveTopicPolicies());
         jcommander.addCommand("set-inactive-topic-policies", new SetInactiveTopicPolicies());
         jcommander.addCommand("remove-inactive-topic-policies", new RemoveInactiveTopicPolicies());
+
         jcommander.addCommand("get-max-consumers", new GetMaxConsumers());
         jcommander.addCommand("set-max-consumers", new SetMaxConsumers());
         jcommander.addCommand("remove-max-consumers", new RemoveMaxConsumers());
+
+        jcommander.addCommand("get-subscribe-rate", new GetSubscribeRate());
+        jcommander.addCommand("set-subscribe-rate", new SetSubscribeRate());
+        jcommander.addCommand("remove-subscribe-rate", new RemoveSubscribeRate());
     }
 
     @Parameters(commandDescription = "Get the list of topics under a namespace.")
@@ -1650,4 +1656,49 @@ public class CmdTopics extends CmdBase {
             admin.topics().removeMaxConsumers(persistentTopic);
         }
     }
+
+    @Parameters(commandDescription = "Get consumer subscribe rate for a topic")
+    private class GetSubscribeRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(admin.topics().getSubscribeRate(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set consumer subscribe rate for a topic")
+    private class SetSubscribeRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--subscribe-rate",
+                "-sr" }, description = "subscribe-rate (default -1 will be overwrite if not passed)\n", required = false)
+        private int subscribeRate = -1;
+
+        @Parameter(names = { "--subscribe-rate-period",
+                "-st" }, description = "subscribe-rate-period in second type (default 30 second will be overwrite if not passed)\n", required = false)
+        private int subscribeRatePeriodSec = 30;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().setSubscribeRate(persistentTopic,
+                    new SubscribeRate(subscribeRate, subscribeRatePeriodSec));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove consumer subscribe rate for a topic")
+    private class RemoveSubscribeRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().removeSubscribeRate(persistentTopic);
+        }
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index 343770d..fb785b6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -54,6 +54,7 @@ public class TopicPolicies {
     private DispatchRate dispatchRate = null;
     private Long compactionThreshold = null;
     private PublishRate publishRate = null;
+    private SubscribeRate subscribeRate = null;
 
     public boolean isInactiveTopicPoliciesSet() {
         return inactiveTopicPolicies != null;
@@ -122,4 +123,8 @@ public class TopicPolicies {
     public boolean isPublishRateSet() {
         return publishRate != null;
     }
+
+    public boolean isSubscribeRateSet() {
+        return subscribeRate != null;
+    }
 }