You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/09/11 12:48:07 UTC

[pulsar] branch master updated: support max consumers per subscription on topic level (#8003)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e4b12df  support max consumers per subscription on topic level (#8003)
e4b12df is described below

commit e4b12df27c1c8edd44db18e1a6f433393cc46977
Author: hangc0276 <ha...@163.com>
AuthorDate: Fri Sep 11 20:47:44 2020 +0800

    support max consumers per subscription on topic level (#8003)
    
    ### Modifications
    Support set max consumers per subscription on topic level.
    Support get max consumers per subscription on topic level.
    Support remove max consumers per subscription on topic level.
    
    ### Verifying this change
    This change added tests and can be verified as follows:
    - test set topic max consumers per subscription
    - test remove topic max consumers per subscription
    - test get topic topic max consumers per subscription
    - test disabled topic max consumers per subscription
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 42 +++++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 87 +++++++++++++++++++++
 .../pulsar/broker/service/BrokerService.java       | 32 ++++++++
 ...onPersistentDispatcherSingleActiveConsumer.java | 35 ++++++---
 .../PersistentDispatcherMultipleConsumers.java     | 32 +++++---
 .../PersistentDispatcherSingleActiveConsumer.java  | 32 +++++---
 .../broker/admin/TopicPoliciesDisableTest.java     | 27 +++++++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 40 +++++++++-
 .../org/apache/pulsar/client/admin/Topics.java     | 88 ++++++++++++++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 78 +++++++++++++++++++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 51 ++++++++++++-
 11 files changed, 510 insertions(+), 34 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 085ad12..9a81e9a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3260,6 +3260,48 @@ public class PersistentTopicsBase extends AdminResource {
 
     }
 
+    protected Optional<Integer> internalGetMaxConsumersPerSubscription() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        return getTopicPolicies(topicName).map(TopicPolicies::getMaxConsumersPerSubscription);
+    }
+
+    protected CompletableFuture<Void> internalSetMaxConsumersPerSubscription(Integer maxConsumersPerSubscription) {
+        if (maxConsumersPerSubscription != null && maxConsumersPerSubscription < 0) {
+            throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for maxConsumersPerSubscription");
+        }
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+
+        TopicPolicies topicPolicies = getTopicPolicies(topicName)
+                .orElseGet(TopicPolicies::new);
+        topicPolicies.setMaxConsumersPerSubscription(maxConsumersPerSubscription);
+        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+    }
+
+    protected CompletableFuture<Void> internalRemoveMaxConsumersPerSubscription() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
+        if (!topicPolicies.isPresent()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        topicPolicies.get().setMaxConsumersPerSubscription(null);
+        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+    }
+
     protected Optional<Long> internalGetCompactionThreshold() {
         validateAdminAccessForTenant(namespaceName.getTenant());
         validatePoliciesReadOnlyAccess();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index cc59ec4..68a12a5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2174,6 +2174,93 @@ public class PersistentTopics extends PersistentTopicsBase {
     }
 
     @GET
+    @Path("/{tenant}/{namespace}/{topic}/maxConsumersPerSubscription")
+    @ApiOperation(value = "Get max consumers per subscription configuration for specified topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getMaxConsumersPerSubscription(@Suspended final AsyncResponse asyncResponse,
+                                       @PathParam("tenant") String tenant,
+                                       @PathParam("namespace") String namespace,
+                                       @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        try {
+            Optional<Integer> maxConsumersPerSubscription = internalGetMaxConsumersPerSubscription();
+            if (!maxConsumersPerSubscription.isPresent()) {
+                asyncResponse.resume(Response.noContent().build());
+            } else {
+                asyncResponse.resume(maxConsumersPerSubscription.get());
+            }
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/maxConsumersPerSubscription")
+    @ApiOperation(value = "Set max consumers per subscription configuration for specified topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void setMaxConsumersPerSubscription(@Suspended final AsyncResponse asyncResponse,
+                                       @PathParam("tenant") String tenant,
+                                       @PathParam("namespace") String namespace,
+                                       @PathParam("topic") @Encoded String encodedTopic,
+                                       @ApiParam(value = "Dispatch rate for the specified topic") int maxConsumersPerSubscription) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription).whenComplete((r, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed to set topic {} max consumers per subscription ", topicName.getLocalName(), ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed to set topic max consumers per subscription");
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                try {
+                    log.info("[{}] Successfully set topic max consumers per subscription: tenant={}, namespace={}, topic={}, maxConsumersPerSubscription={}",
+                            clientAppId(),
+                            tenant,
+                            namespace,
+                            topicName.getLocalName(),
+                            jsonMapper().writeValueAsString(maxConsumersPerSubscription));
+                } catch (JsonProcessingException ignore) {}
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/maxConsumersPerSubscription")
+    @ApiOperation(value = "Remove max consumers per subscription configuration for specified topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeMaxConsumersPerSubscription(@Suspended final AsyncResponse asyncResponse,
+                                          @PathParam("tenant") String tenant,
+                                          @PathParam("namespace") String namespace,
+                                          @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalRemoveMaxConsumersPerSubscription().whenComplete((r, ex) -> {
+            if (ex != null) {
+                log.error("Failed to remove topic {} max consuners per subscription", topicName.getLocalName(), ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                log.info("[{}] Successfully remove topic max consumers per subscription: tenant={}, namespace={}, topic={}",
+                        clientAppId(),
+                        tenant,
+                        namespace,
+                        topicName.getLocalName());
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @GET
     @Path("/{tenant}/{namespace}/{topic}/publishRate")
     @ApiOperation(value = "Get publish rate configuration for specified topic.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1980419..e633c88 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import javax.ws.rs.core.Response;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import static org.apache.commons.collections.CollectionUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -149,6 +150,7 @@ import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FieldParser;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -2361,6 +2363,36 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         return SystemTopicClient.isSystemTopic(TopicName.get(topic));
     }
 
+    /**
+     * Get {@link TopicPolicies} for this topic.
+     * @param topicName
+     * @return TopicPolicies is exist else return null.
+     */
+    public TopicPolicies getTopicPolicies(TopicName topicName) {
+        TopicName cloneTopicName = topicName;
+        if (topicName.isPartitioned()) {
+            cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
+        }
+        try {
+            checkTopicLevelPolicyEnable();
+            return pulsar.getTopicPoliciesService().getTopicPolicies(cloneTopicName);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.warn("Topic {} policies cache have not init.", topicName.getPartitionedTopicName());
+            return null;
+        } catch (RestException | NullPointerException e) {
+            log.warn("Topic level policies are not enabled. " +
+                    "Please refer to systemTopicEnabled and topicLevelPoliciesEnabled on broker.conf");
+            return null;
+        }
+    }
+
+    private void checkTopicLevelPolicyEnable() {
+        if (!pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+            throw new RestException(Response.Status.METHOD_NOT_ALLOWED,
+                    "Topic level policies is disabled, to enable the topic level policy and retry.");
+        }
+    }
+
     public void setInterceptor(BrokerInterceptor interceptor) {
         this.interceptor = interceptor;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 8c642bf..7063990 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.service.nonpersistent;
 
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
 import java.util.List;
@@ -32,12 +34,14 @@ import org.apache.pulsar.broker.service.RedeliveryTracker;
 import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.stats.Rate;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 
+@Slf4j
 public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer implements NonPersistentDispatcher {
 
     private final NonPersistentTopic topic;
@@ -77,21 +81,32 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
     }
 
     protected boolean isConsumersExceededOnSubscription() {
-        Policies policies;
+        Policies policies = null;
+        Integer maxConsumersPerSubscription = null;
         try {
-            // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
-            policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
-
-            if (policies == null) {
-                policies = new Policies();
+            maxConsumersPerSubscription = Optional.ofNullable(topic.getBrokerService()
+                    .getTopicPolicies(TopicName.get(topicName)))
+                    .map(TopicPolicies::getMaxConsumersPerSubscription)
+                    .orElse(null);
+            if (maxConsumersPerSubscription == null) {
+                // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
+                policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                        .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
+
+                if (policies == null) {
+                    policies = new Policies();
+                }
             }
         } catch (Exception e) {
             policies = new Policies();
         }
-        final int maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
-                policies.max_consumers_per_subscription :
-                serviceConfig.getMaxConsumersPerSubscription();
+
+        if (maxConsumersPerSubscription == null) {
+            maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
+                    policies.max_consumers_per_subscription :
+                    serviceConfig.getMaxConsumersPerSubscription();
+        }
+
         if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) {
             return true;
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 5a453e1..a053681 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -43,7 +43,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadE
 import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.logging.log4j.util.Strings;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
@@ -65,6 +64,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
 import org.apache.pulsar.common.util.collections.LongPairSet;
@@ -154,20 +154,32 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     }
 
     private boolean isConsumersExceededOnSubscription() {
-        Policies policies;
+        Policies policies = null;
+        Integer maxConsumersPerSubscription = null;
         try {
-            // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
-            policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
-            if (policies == null) {
-                policies = new Policies();
+            maxConsumersPerSubscription = Optional.ofNullable(topic.getBrokerService()
+                    .getTopicPolicies(TopicName.get(topic.getName())))
+                    .map(TopicPolicies::getMaxConsumersPerSubscription)
+                    .orElse(null);
+
+            if (maxConsumersPerSubscription == null) {
+                // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
+                policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                        .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
+                if (policies == null) {
+                    policies = new Policies();
+                }
             }
         } catch (Exception e) {
             policies = new Policies();
         }
-        final int maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
-                policies.max_consumers_per_subscription :
-                serviceConfig.getMaxConsumersPerSubscription();
+
+        if (maxConsumersPerSubscription == null) {
+            maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
+                    policies.max_consumers_per_subscription :
+                    serviceConfig.getMaxConsumersPerSubscription();
+        }
+
         if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerList.size()) {
             return true;
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 2d33f55..0e6d747 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -135,21 +136,32 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
     }
 
     protected boolean isConsumersExceededOnSubscription() {
-        Policies policies;
+        Policies policies = null;
+        Integer maxConsumersPerSubscription = null;
         try {
-            // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
-            policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
-
-            if (policies == null) {
-                policies = new Policies();
+            maxConsumersPerSubscription = Optional.ofNullable(topic.getBrokerService()
+                    .getTopicPolicies(TopicName.get(topicName)))
+                    .map(TopicPolicies::getMaxConsumersPerSubscription)
+                    .orElse(null);
+            if (maxConsumersPerSubscription == null) {
+                // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
+                policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                        .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
+
+                if (policies == null) {
+                    policies = new Policies();
+                }
             }
         } catch (Exception e) {
             policies = new Policies();
         }
-        final int maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
-                policies.max_consumers_per_subscription :
-                serviceConfig.getMaxConsumersPerSubscription();
+
+        if (maxConsumersPerSubscription == null) {
+            maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
+                    policies.max_consumers_per_subscription :
+                    serviceConfig.getMaxConsumersPerSubscription();
+        }
+
         if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) {
             return true;
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
index 96f874a..fec0395 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
@@ -173,6 +173,33 @@ public class TopicPoliciesDisableTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testMaxConsumersPerSubscription() throws Exception {
+        int maxConsumersPerSubscription = 10;
+        log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", maxConsumersPerSubscription, testTopic);
+
+        try {
+            admin.topics().setMaxConsumersPerSubscription(testTopic, maxConsumersPerSubscription);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+
+        try {
+            admin.topics().getMaxConsumersPerSubscription(testTopic);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+
+        try {
+            admin.topics().removeMaxConsumersPerSubscription(testTopic);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+    }
+
+    @Test
     public void testPublishRateDisabled() throws Exception {
         PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
         log.info("Publish Rate: {} will set to the topic: {}", publishRate, testTopic);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 684a102..ef6eb40 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -589,14 +589,52 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
 
         admin.topics().removeCompactionThreshold(testTopic);
         Thread.sleep(3000);
-        log.info("Compaction threshold get on topic: {} after remove", getCompactionThreshold, testTopic);
         getCompactionThreshold = admin.topics().getCompactionThreshold(testTopic);
+        log.info("Compaction threshold get on topic: {} after remove", getCompactionThreshold, testTopic);
         Assert.assertNull(getCompactionThreshold);
 
         admin.topics().deletePartitionedTopic(testTopic, true);
     }
 
     @Test
+    public void testGetSetMaxConsumersPerSubscription() throws Exception {
+        Integer maxConsumersPerSubscription = 10;
+        log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", maxConsumersPerSubscription, testTopic);
+
+        admin.topics().setMaxConsumersPerSubscription(testTopic, maxConsumersPerSubscription);
+        log.info("MaxConsumersPerSubscription set success on topic: {}", testTopic);
+
+        Thread.sleep(3000);
+        Integer getMaxConsumersPerSubscription = admin.topics().getMaxConsumersPerSubscription(testTopic);
+        log.info("MaxConsumersPerSubscription: {} get on topic: {}", getMaxConsumersPerSubscription, testTopic);
+        Assert.assertEquals(getMaxConsumersPerSubscription, maxConsumersPerSubscription);
+
+        admin.topics().deletePartitionedTopic(testTopic, true);
+    }
+
+    @Test
+    public void testRemoveMaxConsumersPerSubscription() throws Exception {
+        Integer maxConsumersPerSubscription = 10;
+        log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", maxConsumersPerSubscription, testTopic);
+
+        admin.topics().setMaxConsumersPerSubscription(testTopic, maxConsumersPerSubscription);
+        log.info("MaxConsumersPerSubscription set success on topic: {}", testTopic);
+
+        Thread.sleep(3000);
+        Integer getMaxConsumersPerSubscription = admin.topics().getMaxConsumersPerSubscription(testTopic);
+        log.info("MaxConsumersPerSubscription: {} get on topic: {}", getMaxConsumersPerSubscription, testTopic);
+        Assert.assertEquals(getMaxConsumersPerSubscription, maxConsumersPerSubscription);
+
+        admin.topics().removeMaxConsumersPerSubscription(testTopic);
+        Thread.sleep(3000);
+        getMaxConsumersPerSubscription = admin.topics().getMaxConsumersPerSubscription(testTopic);
+        log.info("MaxConsumersPerSubscription get on topic: {} after remove", getMaxConsumersPerSubscription, testTopic);
+        Assert.assertNull(getMaxConsumersPerSubscription);
+
+        admin.topics().deletePartitionedTopic(testTopic, true);
+    }
+
+    @Test
     public void testGetSetPublishRate() throws Exception {
         PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
         log.info("Publish Rate: {} will set to the topic: {}", publishRate, testTopic);
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index c27baf6..282a3a4 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -2159,7 +2159,95 @@ public interface Topics {
      */
     CompletableFuture<Void> removePublishRateAsync(String topic) throws PulsarAdminException;
 
+    /**
+     * Get the maxConsumersPerSubscription for a topic.
+     * <p/>
+     * Response example:
+     *
+     * <pre>
+     * <code>0</code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    Integer getMaxConsumersPerSubscription(String topic) throws PulsarAdminException;
+
+    /**
+     * Get the maxConsumersPerSubscription for a topic asynchronously.
+     * <p/>
+     * Response example:
+     *
+     * <pre>
+     * <code>0</code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     */
+    CompletableFuture<Integer> getMaxConsumersPerSubscriptionAsync(String topic);
+
+    /**
+     * Set maxConsumersPerSubscription for a topic.
+     * <p/>
+     * Request example:
+     *
+     * <pre>
+     * <code>10</code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     * @param maxConsumersPerSubscription
+     *            maxConsumersPerSubscription value for a namespace
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setMaxConsumersPerSubscription(String topic, int maxConsumersPerSubscription) throws PulsarAdminException;
+
+    /**
+     * Set maxConsumersPerSubscription for a topic asynchronously.
+     * <p/>
+     * Request example:
+     *
+     * <pre>
+     * <code>10</code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     * @param maxConsumersPerSubscription
+     *            maxConsumersPerSubscription value for a namespace
+     */
+    CompletableFuture<Void> setMaxConsumersPerSubscriptionAsync(String topic, int maxConsumersPerSubscription);
 
+    /**
+     * Remove the maxConsumersPerSubscription for a topic.
+     * @param topic
+     *            Topic name
+     * @throws PulsarAdminException
+     *            Unexpected error
+     */
+    void removeMaxConsumersPerSubscription(String topic) throws PulsarAdminException;
+
+    /**
+     * Remove the maxConsumersPerSubscription for a topic asynchronously.
+     * @param topic
+     *            Topic name
+     */
+    CompletableFuture<Void> removeMaxConsumersPerSubscriptionAsync(String topic);
 
     /**
      * Get the max number of producer for specified topic.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 7e7233f..ca4e786 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -2316,6 +2316,84 @@ public class TopicsImpl extends BaseResource implements Topics {
     }
 
     @Override
+    public Integer getMaxConsumersPerSubscription(String topic) throws PulsarAdminException {
+        try {
+            return getMaxConsumersPerSubscriptionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Integer> getMaxConsumersPerSubscriptionAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "maxConsumersPerSubscription");
+        final CompletableFuture<Integer> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Integer>() {
+                    @Override
+                    public void completed(Integer maxConsumersPerSubscription) {
+                        future.complete(maxConsumersPerSubscription);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public void setMaxConsumersPerSubscription(String topic, int maxConsumersPerSubscription)
+        throws PulsarAdminException {
+        try {
+            setMaxConsumersPerSubscriptionAsync(topic, maxConsumersPerSubscription)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setMaxConsumersPerSubscriptionAsync(String topic, int maxConsumersPerSubscription) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "maxConsumersPerSubscription");
+        return asyncPostRequest(path, Entity.entity(maxConsumersPerSubscription, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void removeMaxConsumersPerSubscription(String topic) throws PulsarAdminException {
+        try {
+            removeMaxConsumersPerSubscriptionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeMaxConsumersPerSubscriptionAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "maxConsumersPerSubscription");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public Integer getMaxProducers(String topic) throws PulsarAdminException {
         try {
             return getMaxProducersAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 01e0394..54bf2e7 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -147,9 +147,15 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("get-publish-rate", new GetPublishRate());
         jcommander.addCommand("set-publish-rate", new SetPublishRate());
         jcommander.addCommand("remove-publish-rate", new RemovePublishRate());
-        jcommander.addCommand("get-max-producers", new GetMaxProducers());
-        jcommander.addCommand("set-max-producers", new SetMaxProducers());
-        jcommander.addCommand("remove-max-producers", new RemoveMaxProducers());
+
+        jcommander.addCommand("get-maxProducers", new GetMaxProducers());
+        jcommander.addCommand("set-maxProducers", new SetMaxProducers());
+        jcommander.addCommand("remove-maxProducers", new RemoveMaxProducers());
+
+        jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription());
+        jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription());
+        jcommander.addCommand("remove-max-consumers-per-subscription", new RemoveMaxConsumersPerSubscription());
+
         jcommander.addCommand("get-inactive-topic-policies", new GetInactiveTopicPolicies());
         jcommander.addCommand("set-inactive-topic-policies", new SetInactiveTopicPolicies());
         jcommander.addCommand("remove-inactive-topic-policies", new RemoveInactiveTopicPolicies());
@@ -1505,6 +1511,45 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get max consumers per subscription for a topic")
+    private class GetMaxConsumersPerSubscription extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(admin.topics().getMaxConsumersPerSubscription(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set max consumers per subscription for a topic")
+    private class SetMaxConsumersPerSubscription extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--max-consumers-per-subscription", "-c" }, description = "maxConsumersPerSubscription for a namespace", required = true)
+        private int maxConsumersPerSubscription;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().setMaxConsumersPerSubscription(persistentTopic, maxConsumersPerSubscription);
+        }
+    }
+
+    @Parameters(commandDescription = "Remove max consumers per subscription for a topic")
+    private class RemoveMaxConsumersPerSubscription extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().removeMaxConsumersPerSubscription(persistentTopic);
+        }
+    }
+
     @Parameters(commandDescription = "Get the inactive topic policies on a topic")
     private class GetInactiveTopicPolicies extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", required = true)