You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/26 04:58:19 UTC

[incubator-pulsar] branch master updated: Add namespace policy for max clients (#1255)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cd6f539  Add namespace policy for max clients (#1255)
cd6f539 is described below

commit cd6f5396853f984e6fe60dad95b635cbdc7143dc
Author: yush1ga <y....@gmail.com>
AuthorDate: Mon Feb 26 13:58:17 2018 +0900

    Add namespace policy for max clients (#1255)
    
    * Add namespace policy for max clients
    
    * Addressed PR comments
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 124 +++++++++++++++++++
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  73 ++++++++++++
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  72 +++++++++++
 ...onPersistentDispatcherSingleActiveConsumer.java |  28 ++++-
 .../service/nonpersistent/NonPersistentTopic.java  |  12 +-
 .../PersistentDispatcherMultipleConsumers.java     |  28 ++++-
 .../PersistentDispatcherSingleActiveConsumer.java  |  29 ++++-
 .../broker/service/persistent/PersistentTopic.java |  12 +-
 .../pulsar/broker/service/PersistentTopicTest.java |  86 +++++++++++---
 .../org/apache/pulsar/client/admin/Namespaces.java | 132 +++++++++++++++++++++
 .../client/admin/internal/NamespacesImpl.java      |  69 +++++++++++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  88 ++++++++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  22 ++++
 .../pulsar/common/policies/data/Policies.java      |  14 ++-
 14 files changed, 764 insertions(+), 25 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 902a56f..5ecca24 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1242,5 +1242,129 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
+
+    protected int internalGetMaxProducersPerTopic() {
+        validateAdminAccessOnProperty(namespaceName.getProperty());
+        return getNamespacePolicies(namespaceName).max_producers_per_topic;
+    }
+
+    protected void internalSetMaxProducersPerTopic(int maxProducersPerTopic) {
+        validateSuperUserAccess();
+        validatePoliciesReadOnlyAccess();
+
+        try {
+            Stat nodeStat = new Stat();
+            final String path = path(POLICIES, namespaceName.toString());
+            byte[] content = globalZk().getData(path, null, nodeStat);
+            Policies policies = jsonMapper().readValue(content, Policies.class);
+            if (maxProducersPerTopic < 0) {
+                throw new RestException(Status.PRECONDITION_FAILED,
+                        "maxProducersPerTopic must be 0 or more");
+            }
+            policies.max_producers_per_topic = maxProducersPerTopic;
+            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
+            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+            log.info("[{}] Successfully updated maxProducersPerTopic configuration: namespace={}, value={}", clientAppId(),
+                    namespaceName, policies.max_producers_per_topic);
+
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update maxProducersPerTopic configuration for namespace {}: does not exist", clientAppId(),
+                    namespaceName);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to update maxProducersPerTopic configuration for namespace {}: concurrent modification",
+                    clientAppId(), namespaceName);
+            throw new RestException(Status.CONFLICT, "Concurrent modification");
+        } catch (RestException pfe) {
+            throw pfe;
+        } catch (Exception e) {
+            log.error("[{}] Failed to update maxProducersPerTopic configuration for namespace {}", clientAppId(), namespaceName,
+                    e);
+            throw new RestException(e);
+        }
+    }
+
+    protected int internalGetMaxConsumersPerTopic() {
+        validateAdminAccessOnProperty(namespaceName.getProperty());
+        return getNamespacePolicies(namespaceName).max_consumers_per_topic;
+    }
+
+    protected void internalSetMaxConsumersPerTopic(int maxConsumersPerTopic) {
+        validateSuperUserAccess();
+        validatePoliciesReadOnlyAccess();
+
+        try {
+            Stat nodeStat = new Stat();
+            final String path = path(POLICIES, namespaceName.toString());
+            byte[] content = globalZk().getData(path, null, nodeStat);
+            Policies policies = jsonMapper().readValue(content, Policies.class);
+            if (maxConsumersPerTopic < 0) {
+                throw new RestException(Status.PRECONDITION_FAILED,
+                        "maxConsumersPerTopic must be 0 or more");
+            }
+            policies.max_consumers_per_topic = maxConsumersPerTopic;
+            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
+            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+            log.info("[{}] Successfully updated maxConsumersPerTopic configuration: namespace={}, value={}", clientAppId(),
+                    namespaceName, policies.max_consumers_per_topic);
+
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}: does not exist", clientAppId(),
+                    namespaceName);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}: concurrent modification",
+                    clientAppId(), namespaceName);
+            throw new RestException(Status.CONFLICT, "Concurrent modification");
+        } catch (RestException pfe) {
+            throw pfe;
+        } catch (Exception e) {
+            log.error("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}", clientAppId(), namespaceName,
+                    e);
+            throw new RestException(e);
+        }
+    }
+
+    protected int internalGetMaxConsumersPerSubscription() {
+        validateAdminAccessOnProperty(namespaceName.getProperty());
+        return getNamespacePolicies(namespaceName).max_consumers_per_subscription;
+    }
+
+    protected void internalSetMaxConsumersPerSubscription(int maxConsumersPerSubscription) {
+        validateSuperUserAccess();
+        validatePoliciesReadOnlyAccess();
+
+        try {
+            Stat nodeStat = new Stat();
+            final String path = path(POLICIES, namespaceName.toString());
+            byte[] content = globalZk().getData(path, null, nodeStat);
+            Policies policies = jsonMapper().readValue(content, Policies.class);
+            if (maxConsumersPerSubscription < 0) {
+                throw new RestException(Status.PRECONDITION_FAILED,
+                        "maxConsumersPerSubscription must be 0 or more");
+            }
+            policies.max_consumers_per_subscription = maxConsumersPerSubscription;
+            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
+            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+            log.info("[{}] Successfully updated maxConsumersPerSubscription configuration: namespace={}, value={}", clientAppId(),
+                    namespaceName, policies.max_consumers_per_subscription);
+
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}: does not exist", clientAppId(),
+                    namespaceName);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}: concurrent modification",
+                    clientAppId(), namespaceName);
+            throw new RestException(Status.CONFLICT, "Concurrent modification");
+        } catch (RestException pfe) {
+            throw pfe;
+        } catch (Exception e) {
+            log.error("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}", clientAppId(), namespaceName,
+                    e);
+            throw new RestException(e);
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index ddd3154..e3f5e43 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -708,5 +708,78 @@ public class Namespaces extends NamespacesBase {
         internalModifyEncryptionRequired(encryptionRequired);
     }
 
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/maxProducersPerTopic")
+    @ApiOperation(value = "Get maxProducersPerTopic config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist") })
+    public int getMaxProducersPerTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, cluster, namespace);
+        return internalGetMaxProducersPerTopic();
+    }
+
+    @POST
+    @Path("/{property}/{cluster}/{namespace}/maxProducersPerTopic")
+    @ApiOperation(value = " Set maxProducersPerTopic configuration on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "maxProducersPerTopic value is not valid") })
+    public void setMaxProducersPerTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, int maxProducersPerTopic) {
+        validateNamespaceName(property, cluster, namespace);
+        internalSetMaxProducersPerTopic(maxProducersPerTopic);
+    }
+
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/maxConsumersPerTopic")
+    @ApiOperation(value = "Get maxConsumersPerTopic config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist") })
+    public int getMaxConsumersPerTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, cluster, namespace);
+        return internalGetMaxConsumersPerTopic();
+    }
+
+    @POST
+    @Path("/{property}/{cluster}/{namespace}/maxConsumersPerTopic")
+    @ApiOperation(value = " Set maxConsumersPerTopic configuration on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "maxConsumersPerTopic value is not valid") })
+    public void setMaxConsumersPerTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, int maxConsumersPerTopic) {
+        validateNamespaceName(property, cluster, namespace);
+        internalSetMaxConsumersPerTopic(maxConsumersPerTopic);
+    }
+
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/maxConsumersPerSubscription")
+    @ApiOperation(value = "Get maxConsumersPerSubscription config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist") })
+    public int getMaxConsumersPerSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, cluster, namespace);
+        return internalGetMaxConsumersPerSubscription();
+    }
+
+    @POST
+    @Path("/{property}/{cluster}/{namespace}/maxConsumersPerSubscription")
+    @ApiOperation(value = " Set maxConsumersPerSubscription configuration on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "maxConsumersPerSubscription value is not valid") })
+    public void setMaxConsumersPerSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, int maxConsumersPerSubscription) {
+        validateNamespaceName(property, cluster, namespace);
+        internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription);
+    }
+
+
     private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 873d761..c984e76 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -504,6 +504,78 @@ public class Namespaces extends NamespacesBase {
         internalModifyEncryptionRequired(encryptionRequired);
     }
 
+    @GET
+    @Path("/{property}/{namespace}/maxProducersPerTopic")
+    @ApiOperation(value = "Get maxProducersPerTopic config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist") })
+    public int getMaxProducersPerTopic(@PathParam("property") String property,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, namespace);
+        return internalGetMaxProducersPerTopic();
+    }
+
+    @POST
+    @Path("/{property}/{namespace}/maxProducersPerTopic")
+    @ApiOperation(value = " Set maxProducersPerTopic configuration on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "maxProducersPerTopic value is not valid") })
+    public void setMaxProducersPerTopic(@PathParam("property") String property, @PathParam("namespace") String namespace,
+            int maxProducersPerTopic) {
+        validateNamespaceName(property, namespace);
+        internalSetMaxProducersPerTopic(maxProducersPerTopic);
+    }
+
+    @GET
+    @Path("/{property}/{namespace}/maxConsumersPerTopic")
+    @ApiOperation(value = "Get maxConsumersPerTopic config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist") })
+    public int getMaxConsumersPerTopic(@PathParam("property") String property,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, namespace);
+        return internalGetMaxConsumersPerTopic();
+    }
+
+    @POST
+    @Path("/{property}/{namespace}/maxConsumersPerTopic")
+    @ApiOperation(value = " Set maxConsumersPerTopic configuration on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "maxConsumersPerTopic value is not valid") })
+    public void setMaxConsumersPerTopic(@PathParam("property") String property, @PathParam("namespace") String namespace,
+            int maxConsumersPerTopic) {
+        validateNamespaceName(property, namespace);
+        internalSetMaxConsumersPerTopic(maxConsumersPerTopic);
+    }
+
+    @GET
+    @Path("/{property}/{namespace}/maxConsumersPerSubscription")
+    @ApiOperation(value = "Get maxConsumersPerSubscription config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist") })
+    public int getMaxConsumersPerSubscription(@PathParam("property") String property,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, namespace);
+        return internalGetMaxConsumersPerSubscription();
+    }
+
+    @POST
+    @Path("/{property}/{namespace}/maxConsumersPerSubscription")
+    @ApiOperation(value = " Set maxConsumersPerSubscription configuration on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "maxConsumersPerSubscription value is not valid") })
+    public void setMaxConsumersPerSubscription(@PathParam("property") String property, @PathParam("namespace") String namespace,
+            int maxConsumersPerSubscription) {
+        validateNamespaceName(property, namespace);
+        internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription);
+    }
+
     private Policies getDefaultPolicesIfNull(Policies policies) {
         if (policies != null) {
             return policies;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 5b68c67..092dc36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.nonpersistent;
 
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.service.Consumer.getBatchSizeforEntry;
 
 import java.util.List;
@@ -25,10 +26,13 @@ import java.util.List;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.util.Rate;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.policies.data.Policies;
 
 public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer implements NonPersistentDispatcher {
 
@@ -63,7 +67,17 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
     }
 
     protected boolean isConsumersExceededOnTopic() {
-        final int maxConsumersPerTopic = serviceConfig.getMaxConsumersPerTopic();
+        Policies policies;
+        try {
+            policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+                    .orElseGet(() -> new Policies());
+        } catch (Exception e) {
+            policies = new Policies();
+        }
+        final int maxConsumersPerTopic = policies.max_consumers_per_topic > 0 ?
+                policies.max_consumers_per_topic :
+                serviceConfig.getMaxConsumersPerTopic();
         if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= topic.getNumberOfConsumers()) {
             return true;
         }
@@ -71,7 +85,17 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
     }
 
     protected boolean isConsumersExceededOnSubscription() {
-        final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription();
+        Policies policies;
+        try {
+            policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+                    .orElseGet(() -> new Policies());
+        } catch (Exception e) {
+            policies = new Policies();
+        }
+        final int maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
+                policies.max_consumers_per_subscription :
+                serviceConfig.getMaxConsumersPerSubscription();
         if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) {
             return true;
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index a03ac6c..d8510df 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -265,7 +265,17 @@ public class NonPersistentTopic implements Topic {
     }
 
     private boolean isProducersExceeded() {
-        final int maxProducers = brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
+        Policies policies;
+        try {
+            policies =  brokerService.pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, DestinationName.get(topic).getNamespace()))
+                    .orElseGet(() -> new Policies());
+        } catch (Exception e) {
+            policies = new Policies();
+        }
+        final int maxProducers = policies.max_producers_per_topic > 0 ?
+                policies.max_producers_per_topic :
+                brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
         if (maxProducers > 0 && maxProducers <= producers.size()) {
             return true;
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 5189465..2e59811 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static java.util.stream.Collectors.toSet;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
 
 import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsExcep
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Consumer;
@@ -44,6 +46,8 @@ import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
@@ -132,7 +136,17 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
     }
 
     private boolean isConsumersExceededOnTopic() {
-        final int maxConsumersPerTopic = serviceConfig.getMaxConsumersPerTopic();
+        Policies policies;
+        try {
+            policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, DestinationName.get(topic.getName()).getNamespace()))
+                    .orElseGet(() -> new Policies());
+        } catch (Exception e) {
+            policies = new Policies();
+        }
+        final int maxConsumersPerTopic = policies.max_consumers_per_topic > 0 ?
+                policies.max_consumers_per_topic :
+                serviceConfig.getMaxConsumersPerTopic();
         if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= topic.getNumberOfConsumers()) {
             return true;
         }
@@ -140,7 +154,17 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
     }
 
     private boolean isConsumersExceededOnSubscription() {
-        final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription();
+        Policies policies;
+        try {
+            policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, DestinationName.get(topic.getName()).getNamespace()))
+                    .orElseGet(() -> new Policies());
+        } catch (Exception e) {
+            policies = new Policies();
+        }
+        final int maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
+                policies.max_consumers_per_subscription :
+                serviceConfig.getMaxConsumersPerSubscription();
         if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerList.size()) {
             return true;
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index a26d862..bd197e8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -19,9 +19,11 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ScheduledFuture;
 
@@ -33,12 +35,15 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadE
 import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,7 +119,17 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
     }
 
     protected boolean isConsumersExceededOnTopic() {
-        final int maxConsumersPerTopic = serviceConfig.getMaxConsumersPerTopic();
+        Policies policies;
+        try {
+            policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+                    .orElseGet(() -> new Policies());
+        } catch (Exception e) {
+            policies = new Policies();
+        }
+        final int maxConsumersPerTopic = policies.max_consumers_per_topic > 0 ?
+                policies.max_consumers_per_topic :
+                serviceConfig.getMaxConsumersPerTopic();
         if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= topic.getNumberOfConsumers()) {
             return true;
         }
@@ -122,7 +137,17 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
     }
 
     protected boolean isConsumersExceededOnSubscription() {
-        final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription();
+        Policies policies;
+        try {
+            policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+                    .orElseGet(() -> new Policies());
+        } catch (Exception e) {
+            policies = new Policies();
+        }
+        final int maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
+                policies.max_consumers_per_subscription :
+                serviceConfig.getMaxConsumersPerSubscription();
         if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) {
             return true;
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6e6b5d8..4d5d974 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -348,7 +348,17 @@ public class PersistentTopic implements Topic, AddEntryCallback {
     }
 
     private boolean isProducersExceeded() {
-        final int maxProducers = brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
+        Policies policies;
+        try {
+            policies =  brokerService.pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, DestinationName.get(topic).getNamespace()))
+                    .orElseGet(() -> new Policies());
+        } catch (Exception e) {
+            policies = new Policies();
+        }
+        final int maxProducers = policies.max_producers_per_topic > 0 ?
+                policies.max_producers_per_topic :
+                brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
         if (maxProducers > 0 && maxProducers <= producers.size()) {
             return true;
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index b19b053..d0277ef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -365,15 +365,8 @@ public class PersistentTopicTest {
         topic.removeProducer(producer); /* noop */
     }
 
-    @Test
     public void testMaxProducers() throws Exception {
-        // set max clients
-        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
-        doReturn(2).when(svcConfig).getMaxProducersPerTopic();
-        doReturn(svcConfig).when(pulsar).getConfiguration();
-
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
-
         String role = "appid1";
         // 1. add producer1
         Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role, false, null);
@@ -396,6 +389,28 @@ public class PersistentTopicTest {
     }
 
     @Test
+    public void testMaxProducersForBroker() throws Exception {
+        // set max clients
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        doReturn(2).when(svcConfig).getMaxProducersPerTopic();
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+        testMaxProducers();
+    }
+
+    @Test
+    public void testMaxProducersForNamespace() throws Exception {
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+        // set max clients
+        Policies policies = new Policies();
+        policies.max_producers_per_topic = 2;
+        when(pulsar.getConfigurationCache().policiesCache()
+                .get(AdminResource.path(POLICIES, DestinationName.get(successTopicName).getNamespace())))
+                .thenReturn(Optional.of(policies));
+        testMaxProducers();
+    }
+
+    @Test
     public void testSubscribeFail() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
 
@@ -477,14 +492,7 @@ public class PersistentTopicTest {
         }
     }
 
-    @Test
     public void testMaxConsumersShared() throws Exception {
-        // set max clients
-        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
-        doReturn(2).when(svcConfig).getMaxConsumersPerSubscription();
-        doReturn(3).when(svcConfig).getMaxConsumersPerTopic();
-        doReturn(svcConfig).when(pulsar).getConfiguration();
-
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
         PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
         PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock);
@@ -548,13 +556,34 @@ public class PersistentTopicTest {
     }
 
     @Test
-    public void testMaxConsumersFailover() throws Exception {
+    public void testMaxConsumersSharedForBroker() throws Exception {
         // set max clients
         ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
         doReturn(2).when(svcConfig).getMaxConsumersPerSubscription();
         doReturn(3).when(svcConfig).getMaxConsumersPerTopic();
         doReturn(svcConfig).when(pulsar).getConfiguration();
 
+        testMaxConsumersShared();
+    }
+
+    @Test
+    public void testMaxConsumersSharedForNamespace() throws Exception {
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+
+        // set max clients
+        Policies policies = new Policies();
+        policies.max_consumers_per_subscription = 2;
+        policies.max_consumers_per_topic = 3;
+        when(pulsar.getConfigurationCache().policiesCache()
+                .get(AdminResource.path(POLICIES, DestinationName.get(successTopicName).getNamespace())))
+                .thenReturn(Optional.of(policies));
+
+        testMaxConsumersShared();
+    }
+
+    public void testMaxConsumersFailover() throws Exception {
+
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
         PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
         PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock);
@@ -618,6 +647,33 @@ public class PersistentTopicTest {
     }
 
     @Test
+    public void testMaxConsumersFailoverForBroker() throws Exception {
+        // set max clients
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        doReturn(2).when(svcConfig).getMaxConsumersPerSubscription();
+        doReturn(3).when(svcConfig).getMaxConsumersPerTopic();
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+
+        testMaxConsumersFailover();
+    }
+
+    @Test
+    public void testMaxConsumersFailoverForNamespace() throws Exception {
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+
+        // set max clients
+        Policies policies = new Policies();
+        policies.max_consumers_per_subscription = 2;
+        policies.max_consumers_per_topic = 3;
+        when(pulsar.getConfigurationCache().policiesCache()
+                .get(AdminResource.path(POLICIES, DestinationName.get(successTopicName).getNamespace())))
+                .thenReturn(Optional.of(policies));
+
+        testMaxConsumersFailover();
+    }
+
+    @Test
     public void testUbsubscribeRaceConditions() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
         PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 65ce200..c8d3204 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -917,4 +917,136 @@ public interface Namespaces {
      * @throws PulsarAdminException
      */
     void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscriptionAuthMode) throws PulsarAdminException;
+
+    /**
+     * Get the maxProducersPerTopic for a namespace.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>0</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    int getMaxProducersPerTopic(String namespace) throws PulsarAdminException;
+
+    /**
+     * Set maxProducersPerTopic for a namespace.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>10</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param maxProducersPerTopic
+     *            maxProducersPerTopic value for a namespace
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setMaxProducersPerTopic(String namespace, int maxProducersPerTopic) throws PulsarAdminException;
+
+    /**
+     * Get the maxProducersPerTopic for a namespace.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>0</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    int getMaxConsumersPerTopic(String namespace) throws PulsarAdminException;
+
+    /**
+     * Set maxConsumersPerTopic for a namespace.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>10</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param maxConsumersPerTopic
+     *            maxConsumersPerTopic value for a namespace
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setMaxConsumersPerTopic(String namespace, int maxConsumersPerTopic) throws PulsarAdminException;
+
+    /**
+     * Get the maxConsumersPerSubscription for a namespace.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>0</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException;
+
+    /**
+     * Set maxConsumersPerSubscription for a namespace.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>10</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param maxConsumersPerSubscription
+     *            maxConsumersPerSubscription value for a namespace
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setMaxConsumersPerSubscription(String namespace, int maxConsumersPerSubscription) throws PulsarAdminException;
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 6db9ab0..6d3ff3d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -534,4 +534,73 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
             throw getApiException(e);
         }
     }
+
+    @Override
+    public int getMaxProducersPerTopic(String namespace) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            return request(
+                    namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("maxProducersPerTopic"))
+                    .get(Integer.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void setMaxProducersPerTopic(String namespace, int maxProducersPerTopic) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("maxProducersPerTopic"))
+                    .post(Entity.entity(maxProducersPerTopic, MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public int getMaxConsumersPerTopic(String namespace) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            return request(
+                    namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("maxConsumersPerTopic"))
+                    .get(Integer.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void setMaxConsumersPerTopic(String namespace, int maxConsumersPerTopic) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("maxConsumersPerTopic"))
+                    .post(Entity.entity(maxConsumersPerTopic, MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            return request(
+                    namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("maxConsumersPerSubscription"))
+                    .get(Integer.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void setMaxConsumersPerSubscription(String namespace, int maxConsumersPerSubscription) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("maxConsumersPerSubscription"))
+                    .post(Entity.entity(maxConsumersPerSubscription, MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 0f93708..9d93bca 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -611,6 +611,87 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get maxProducersPerTopic for a namespace")
+    private class GetMaxProducersPerTopic extends CliCommand {
+        @Parameter(description = "property/cluster/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(admin.namespaces().getMaxProducersPerTopic(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Set maxProducersPerTopic for a namespace")
+    private class SetMaxProducersPerTopic extends CliCommand {
+        @Parameter(description = "property/cluster/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--max-producers-per-topic", "-p" }, description = "maxProducersPerTopic for a namespace", required = true)
+        private int maxProducersPerTopic;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().setMaxProducersPerTopic(namespace, maxProducersPerTopic);
+        }
+    }
+
+    @Parameters(commandDescription = "Get maxConsumersPerTopic for a namespace")
+    private class GetMaxConsumersPerTopic extends CliCommand {
+        @Parameter(description = "property/cluster/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(admin.namespaces().getMaxConsumersPerTopic(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Set maxConsumersPerTopic for a namespace")
+    private class SetMaxConsumersPerTopic extends CliCommand {
+        @Parameter(description = "property/cluster/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--max-consumers-per-topic", "-c" }, description = "maxConsumersPerTopic for a namespace", required = true)
+        private int maxConsumersPerTopic;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().setMaxConsumersPerTopic(namespace, maxConsumersPerTopic);
+        }
+    }
+
+    @Parameters(commandDescription = "Get maxConsumersPerSubscription for a namespace")
+    private class GetMaxConsumersPerSubscription extends CliCommand {
+        @Parameter(description = "property/cluster/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(admin.namespaces().getMaxConsumersPerSubscription(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Set maxConsumersPerSubscription for a namespace")
+    private class SetMaxConsumersPerSubscription extends CliCommand {
+        @Parameter(description = "property/cluster/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--max-consumers-per-subscription", "-c" }, description = "maxConsumersPerSubscription for a namespace", required = true)
+        private int maxConsumersPerSubscription;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().setMaxConsumersPerSubscription(namespace, maxConsumersPerSubscription);
+        }
+    }
+
     private static long validateSizeString(String s) {
         char last = s.charAt(s.length() - 1);
         String subStr = s.substring(0, s.length() - 1);
@@ -711,5 +792,12 @@ public class CmdNamespaces extends CmdBase {
 
         jcommander.addCommand("set-encryption-required", new SetEncryptionRequired());
         jcommander.addCommand("set-subscription-auth-mode", new SetSubscriptionAuthMode());
+
+        jcommander.addCommand("get-max-producers-per-topic", new GetMaxProducersPerTopic());
+        jcommander.addCommand("set-max-producers-per-topic", new SetMaxProducersPerTopic());
+        jcommander.addCommand("get-max-consumers-per-topic", new GetMaxConsumersPerTopic());
+        jcommander.addCommand("set-max-consumers-per-topic", new SetMaxConsumersPerTopic());
+        jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription());
+        jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription());
     }
 }
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index cf90477..46fa7f7 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -365,6 +365,28 @@ public class PulsarAdminToolTest {
 
         namespaces.run(split("unsubscribe -b 0x80000000_0xffffffff -s my-sub myprop/clust/ns1"));
         verify(mockNamespaces).unsubscribeNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff", "my-sub");
+
+        mockNamespaces = mock(Namespaces.class);
+        when(admin.namespaces()).thenReturn(mockNamespaces);
+        namespaces = new CmdNamespaces(admin);
+
+        namespaces.run(split("get-max-producers-per-topic myprop/clust/ns1"));
+        verify(mockNamespaces).getMaxProducersPerTopic("myprop/clust/ns1");
+
+        namespaces.run(split("set-max-producers-per-topic myprop/clust/ns1 -p 1"));
+        verify(mockNamespaces).setMaxProducersPerTopic("myprop/clust/ns1", 1);
+
+        namespaces.run(split("get-max-consumers-per-topic myprop/clust/ns1"));
+        verify(mockNamespaces).getMaxConsumersPerTopic("myprop/clust/ns1");
+
+        namespaces.run(split("set-max-consumers-per-topic myprop/clust/ns1 -c 2"));
+        verify(mockNamespaces).setMaxConsumersPerTopic("myprop/clust/ns1", 2);
+
+        namespaces.run(split("get-max-consumers-per-subscription myprop/clust/ns1"));
+        verify(mockNamespaces).getMaxConsumersPerSubscription("myprop/clust/ns1");
+
+        namespaces.run(split("set-max-consumers-per-subscription myprop/clust/ns1 -c 3"));
+        verify(mockNamespaces).setMaxConsumersPerSubscription("myprop/clust/ns1", 3);
     }
 
     @Test
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index f3486b8..03a8ce3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -50,6 +50,10 @@ public class Policies {
     public boolean encryption_required = false;
     public SubscriptionAuthMode subscription_auth_mode = SubscriptionAuthMode.None;
 
+    public int max_producers_per_topic = 0;
+    public int max_consumers_per_topic = 0;
+    public int max_consumers_per_subscription = 0;
+
     @Override
     public boolean equals(Object obj) {
         if (obj instanceof Policies) {
@@ -65,7 +69,10 @@ public class Policies {
                     && Objects.equals(retention_policies, other.retention_policies)
                     && Objects.equals(encryption_required, other.encryption_required)
                     && Objects.equals(subscription_auth_mode, other.subscription_auth_mode)
-                    && Objects.equals(antiAffinityGroup, other.antiAffinityGroup);
+                    && Objects.equals(antiAffinityGroup, other.antiAffinityGroup)
+                    && max_producers_per_topic == other.max_producers_per_topic
+                    && max_consumers_per_topic == other.max_consumers_per_topic
+                    && max_consumers_per_subscription == other.max_consumers_per_subscription;
         }
 
         return false;
@@ -92,6 +99,9 @@ public class Policies {
                 .add("message_ttl_in_seconds", message_ttl_in_seconds).add("retention_policies", retention_policies)
                 .add("deleted", deleted)
                 .add("encryption_required", encryption_required)
-                .add("subscription_auth_mode", subscription_auth_mode).toString();
+                .add("subscription_auth_mode", subscription_auth_mode)
+                .add("max_producers_per_topic", max_producers_per_topic)
+                .add("max_consumers_per_topic", max_consumers_per_topic)
+                .add("max_consumers_per_subscription", max_consumers_per_topic).toString();
     }
 }

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.