You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/26 04:58:19 UTC

[GitHub] merlimat closed pull request #1255: Add namespace policy for max clients

merlimat closed pull request #1255: Add namespace policy for max clients
URL: https://github.com/apache/incubator-pulsar/pull/1255
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0dda3aed0..7b007d9ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1242,5 +1242,129 @@ private void validatePolicies(NamespaceName ns, Policies policies) {
         }
     }
 
+
+    protected int internalGetMaxProducersPerTopic() {
+        validateAdminAccessOnProperty(namespaceName.getProperty());
+        return getNamespacePolicies(namespaceName).max_producers_per_topic;
+    }
+
+    protected void internalSetMaxProducersPerTopic(int maxProducersPerTopic) {
+        validateSuperUserAccess();
+        validatePoliciesReadOnlyAccess();
+
+        try {
+            Stat nodeStat = new Stat();
+            final String path = path(POLICIES, namespaceName.toString());
+            byte[] content = globalZk().getData(path, null, nodeStat);
+            Policies policies = jsonMapper().readValue(content, Policies.class);
+            if (maxProducersPerTopic < 0) {
+                throw new RestException(Status.PRECONDITION_FAILED,
+                        "maxProducersPerTopic must be 0 or more");
+            }
+            policies.max_producers_per_topic = maxProducersPerTopic;
+            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
+            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+            log.info("[{}] Successfully updated maxProducersPerTopic configuration: namespace={}, value={}", clientAppId(),
+                    namespaceName, policies.max_producers_per_topic);
+
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update maxProducersPerTopic configuration for namespace {}: does not exist", clientAppId(),
+                    namespaceName);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to update maxProducersPerTopic configuration for namespace {}: concurrent modification",
+                    clientAppId(), namespaceName);
+            throw new RestException(Status.CONFLICT, "Concurrent modification");
+        } catch (RestException pfe) {
+            throw pfe;
+        } catch (Exception e) {
+            log.error("[{}] Failed to update maxProducersPerTopic configuration for namespace {}", clientAppId(), namespaceName,
+                    e);
+            throw new RestException(e);
+        }
+    }
+
+    protected int internalGetMaxConsumersPerTopic() {
+        validateAdminAccessOnProperty(namespaceName.getProperty());
+        return getNamespacePolicies(namespaceName).max_consumers_per_topic;
+    }
+
+    protected void internalSetMaxConsumersPerTopic(int maxConsumersPerTopic) {
+        validateSuperUserAccess();
+        validatePoliciesReadOnlyAccess();
+
+        try {
+            Stat nodeStat = new Stat();
+            final String path = path(POLICIES, namespaceName.toString());
+            byte[] content = globalZk().getData(path, null, nodeStat);
+            Policies policies = jsonMapper().readValue(content, Policies.class);
+            if (maxConsumersPerTopic < 0) {
+                throw new RestException(Status.PRECONDITION_FAILED,
+                        "maxConsumersPerTopic must be 0 or more");
+            }
+            policies.max_consumers_per_topic = maxConsumersPerTopic;
+            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
+            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+            log.info("[{}] Successfully updated maxConsumersPerTopic configuration: namespace={}, value={}", clientAppId(),
+                    namespaceName, policies.max_consumers_per_topic);
+
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}: does not exist", clientAppId(),
+                    namespaceName);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}: concurrent modification",
+                    clientAppId(), namespaceName);
+            throw new RestException(Status.CONFLICT, "Concurrent modification");
+        } catch (RestException pfe) {
+            throw pfe;
+        } catch (Exception e) {
+            log.error("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}", clientAppId(), namespaceName,
+                    e);
+            throw new RestException(e);
+        }
+    }
+
+    protected int internalGetMaxConsumersPerSubscription() {
+        validateAdminAccessOnProperty(namespaceName.getProperty());
+        return getNamespacePolicies(namespaceName).max_consumers_per_subscription;
+    }
+
+    protected void internalSetMaxConsumersPerSubscription(int maxConsumersPerSubscription) {
+        validateSuperUserAccess();
+        validatePoliciesReadOnlyAccess();
+
+        try {
+            Stat nodeStat = new Stat();
+            final String path = path(POLICIES, namespaceName.toString());
+            byte[] content = globalZk().getData(path, null, nodeStat);
+            Policies policies = jsonMapper().readValue(content, Policies.class);
+            if (maxConsumersPerSubscription < 0) {
+                throw new RestException(Status.PRECONDITION_FAILED,
+                        "maxConsumersPerSubscription must be 0 or more");
+            }
+            policies.max_consumers_per_subscription = maxConsumersPerSubscription;
+            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
+            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+            log.info("[{}] Successfully updated maxConsumersPerSubscription configuration: namespace={}, value={}", clientAppId(),
+                    namespaceName, policies.max_consumers_per_subscription);
+
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}: does not exist", clientAppId(),
+                    namespaceName);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}: concurrent modification",
+                    clientAppId(), namespaceName);
+            throw new RestException(Status.CONFLICT, "Concurrent modification");
+        } catch (RestException pfe) {
+            throw pfe;
+        } catch (Exception e) {
+            log.error("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}", clientAppId(), namespaceName,
+                    e);
+            throw new RestException(e);
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 3ae1d4bca..a7d3d3f2c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -707,5 +707,78 @@ public void modifyEncryptionRequired(@PathParam("property") String property, @Pa
         internalModifyEncryptionRequired(encryptionRequired);
     }
 
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/maxProducersPerTopic")
+    @ApiOperation(value = "Get maxProducersPerTopic config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist") })
+    public int getMaxProducersPerTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, cluster, namespace);
+        return internalGetMaxProducersPerTopic();
+    }
+
+    @POST
+    @Path("/{property}/{cluster}/{namespace}/maxProducersPerTopic")
+    @ApiOperation(value = " Set maxProducersPerTopic configuration on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "maxProducersPerTopic value is not valid") })
+    public void setMaxProducersPerTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, int maxProducersPerTopic) {
+        validateNamespaceName(property, cluster, namespace);
+        internalSetMaxProducersPerTopic(maxProducersPerTopic);
+    }
+
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/maxConsumersPerTopic")
+    @ApiOperation(value = "Get maxConsumersPerTopic config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist") })
+    public int getMaxConsumersPerTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, cluster, namespace);
+        return internalGetMaxConsumersPerTopic();
+    }
+
+    @POST
+    @Path("/{property}/{cluster}/{namespace}/maxConsumersPerTopic")
+    @ApiOperation(value = " Set maxConsumersPerTopic configuration on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "maxConsumersPerTopic value is not valid") })
+    public void setMaxConsumersPerTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, int maxConsumersPerTopic) {
+        validateNamespaceName(property, cluster, namespace);
+        internalSetMaxConsumersPerTopic(maxConsumersPerTopic);
+    }
+
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/maxConsumersPerSubscription")
+    @ApiOperation(value = "Get maxConsumersPerSubscription config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist") })
+    public int getMaxConsumersPerSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, cluster, namespace);
+        return internalGetMaxConsumersPerSubscription();
+    }
+
+    @POST
+    @Path("/{property}/{cluster}/{namespace}/maxConsumersPerSubscription")
+    @ApiOperation(value = " Set maxConsumersPerSubscription configuration on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "maxConsumersPerSubscription value is not valid") })
+    public void setMaxConsumersPerSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, int maxConsumersPerSubscription) {
+        validateNamespaceName(property, cluster, namespace);
+        internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription);
+    }
+
+
     private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index d00499a4b..7a0f4a601 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -504,6 +504,78 @@ public void modifyEncryptionRequired(@PathParam("property") String property,
         internalModifyEncryptionRequired(encryptionRequired);
     }
 
+    @GET
+    @Path("/{property}/{namespace}/maxProducersPerTopic")
+    @ApiOperation(value = "Get maxProducersPerTopic config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist") })
+    public int getMaxProducersPerTopic(@PathParam("property") String property,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, namespace);
+        return internalGetMaxProducersPerTopic();
+    }
+
+    @POST
+    @Path("/{property}/{namespace}/maxProducersPerTopic")
+    @ApiOperation(value = " Set maxProducersPerTopic configuration on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "maxProducersPerTopic value is not valid") })
+    public void setMaxProducersPerTopic(@PathParam("property") String property, @PathParam("namespace") String namespace,
+            int maxProducersPerTopic) {
+        validateNamespaceName(property, namespace);
+        internalSetMaxProducersPerTopic(maxProducersPerTopic);
+    }
+
+    @GET
+    @Path("/{property}/{namespace}/maxConsumersPerTopic")
+    @ApiOperation(value = "Get maxConsumersPerTopic config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist") })
+    public int getMaxConsumersPerTopic(@PathParam("property") String property,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, namespace);
+        return internalGetMaxConsumersPerTopic();
+    }
+
+    @POST
+    @Path("/{property}/{namespace}/maxConsumersPerTopic")
+    @ApiOperation(value = " Set maxConsumersPerTopic configuration on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "maxConsumersPerTopic value is not valid") })
+    public void setMaxConsumersPerTopic(@PathParam("property") String property, @PathParam("namespace") String namespace,
+            int maxConsumersPerTopic) {
+        validateNamespaceName(property, namespace);
+        internalSetMaxConsumersPerTopic(maxConsumersPerTopic);
+    }
+
+    @GET
+    @Path("/{property}/{namespace}/maxConsumersPerSubscription")
+    @ApiOperation(value = "Get maxConsumersPerSubscription config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist") })
+    public int getMaxConsumersPerSubscription(@PathParam("property") String property,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, namespace);
+        return internalGetMaxConsumersPerSubscription();
+    }
+
+    @POST
+    @Path("/{property}/{namespace}/maxConsumersPerSubscription")
+    @ApiOperation(value = " Set maxConsumersPerSubscription configuration on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "maxConsumersPerSubscription value is not valid") })
+    public void setMaxConsumersPerSubscription(@PathParam("property") String property, @PathParam("namespace") String namespace,
+            int maxConsumersPerSubscription) {
+        validateNamespaceName(property, namespace);
+        internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription);
+    }
+
     private Policies getDefaultPolicesIfNull(Policies policies) {
         if (policies != null) {
             return policies;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 5b68c673f..092dc3617 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.nonpersistent;
 
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.service.Consumer.getBatchSizeforEntry;
 
 import java.util.List;
@@ -25,10 +26,13 @@
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.util.Rate;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.policies.data.Policies;
 
 public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer implements NonPersistentDispatcher {
 
@@ -63,7 +67,17 @@ public void sendMessages(List<Entry> entries) {
     }
 
     protected boolean isConsumersExceededOnTopic() {
-        final int maxConsumersPerTopic = serviceConfig.getMaxConsumersPerTopic();
+        Policies policies;
+        try {
+            policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+                    .orElseGet(() -> new Policies());
+        } catch (Exception e) {
+            policies = new Policies();
+        }
+        final int maxConsumersPerTopic = policies.max_consumers_per_topic > 0 ?
+                policies.max_consumers_per_topic :
+                serviceConfig.getMaxConsumersPerTopic();
         if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= topic.getNumberOfConsumers()) {
             return true;
         }
@@ -71,7 +85,17 @@ protected boolean isConsumersExceededOnTopic() {
     }
 
     protected boolean isConsumersExceededOnSubscription() {
-        final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription();
+        Policies policies;
+        try {
+            policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+                    .orElseGet(() -> new Policies());
+        } catch (Exception e) {
+            policies = new Policies();
+        }
+        final int maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
+                policies.max_consumers_per_subscription :
+                serviceConfig.getMaxConsumersPerSubscription();
         if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) {
             return true;
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index fdbadc67b..ec778d17c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -262,7 +262,17 @@ public void addProducer(Producer producer) throws BrokerServiceException {
     }
 
     private boolean isProducersExceeded() {
-        final int maxProducers = brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
+        Policies policies;
+        try {
+            policies =  brokerService.pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, DestinationName.get(topic).getNamespace()))
+                    .orElseGet(() -> new Policies());
+        } catch (Exception e) {
+            policies = new Policies();
+        }
+        final int maxProducers = policies.max_producers_per_topic > 0 ?
+                policies.max_producers_per_topic :
+                brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
         if (maxProducers > 0 && maxProducers <= producers.size()) {
             return true;
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 518946564..2e59811b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static java.util.stream.Collectors.toSet;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
 
 import java.util.List;
@@ -36,6 +37,7 @@
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Consumer;
@@ -44,6 +46,8 @@
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
@@ -132,7 +136,17 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
     }
 
     private boolean isConsumersExceededOnTopic() {
-        final int maxConsumersPerTopic = serviceConfig.getMaxConsumersPerTopic();
+        Policies policies;
+        try {
+            policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, DestinationName.get(topic.getName()).getNamespace()))
+                    .orElseGet(() -> new Policies());
+        } catch (Exception e) {
+            policies = new Policies();
+        }
+        final int maxConsumersPerTopic = policies.max_consumers_per_topic > 0 ?
+                policies.max_consumers_per_topic :
+                serviceConfig.getMaxConsumersPerTopic();
         if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= topic.getNumberOfConsumers()) {
             return true;
         }
@@ -140,7 +154,17 @@ private boolean isConsumersExceededOnTopic() {
     }
 
     private boolean isConsumersExceededOnSubscription() {
-        final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription();
+        Policies policies;
+        try {
+            policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, DestinationName.get(topic.getName()).getNamespace()))
+                    .orElseGet(() -> new Policies());
+        } catch (Exception e) {
+            policies = new Policies();
+        }
+        final int maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
+                policies.max_consumers_per_subscription :
+                serviceConfig.getMaxConsumersPerSubscription();
         if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerList.size()) {
             return true;
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 716e3325e..656beffbf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -19,9 +19,11 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ScheduledFuture;
 
@@ -33,12 +35,15 @@
 import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -108,7 +113,17 @@ protected void scheduleReadOnActiveConsumer() {
     }
 
     protected boolean isConsumersExceededOnTopic() {
-        final int maxConsumersPerTopic = serviceConfig.getMaxConsumersPerTopic();
+        Policies policies;
+        try {
+            policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+                    .orElseGet(() -> new Policies());
+        } catch (Exception e) {
+            policies = new Policies();
+        }
+        final int maxConsumersPerTopic = policies.max_consumers_per_topic > 0 ?
+                policies.max_consumers_per_topic :
+                serviceConfig.getMaxConsumersPerTopic();
         if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= topic.getNumberOfConsumers()) {
             return true;
         }
@@ -116,7 +131,17 @@ protected boolean isConsumersExceededOnTopic() {
     }
 
     protected boolean isConsumersExceededOnSubscription() {
-        final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription();
+        Policies policies;
+        try {
+            policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+                    .orElseGet(() -> new Policies());
+        } catch (Exception e) {
+            policies = new Policies();
+        }
+        final int maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
+                policies.max_consumers_per_subscription :
+                serviceConfig.getMaxConsumersPerSubscription();
         if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) {
             return true;
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5c82ef380..1cb94c35d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -345,7 +345,17 @@ public void addProducer(Producer producer) throws BrokerServiceException {
     }
 
     private boolean isProducersExceeded() {
-        final int maxProducers = brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
+        Policies policies;
+        try {
+            policies =  brokerService.pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, DestinationName.get(topic).getNamespace()))
+                    .orElseGet(() -> new Policies());
+        } catch (Exception e) {
+            policies = new Policies();
+        }
+        final int maxProducers = policies.max_producers_per_topic > 0 ?
+                policies.max_producers_per_topic :
+                brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
         if (maxProducers > 0 && maxProducers <= producers.size()) {
             return true;
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 55019a0d6..bb6bb5455 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -362,15 +362,8 @@ public void testAddRemoveProducer() throws Exception {
         topic.removeProducer(producer); /* noop */
     }
 
-    @Test
     public void testMaxProducers() throws Exception {
-        // set max clients
-        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
-        doReturn(2).when(svcConfig).getMaxProducersPerTopic();
-        doReturn(svcConfig).when(pulsar).getConfiguration();
-
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
-
         String role = "appid1";
         // 1. add producer1
         Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role, false, null);
@@ -392,6 +385,28 @@ public void testMaxProducers() throws Exception {
         }
     }
 
+    @Test
+    public void testMaxProducersForBroker() throws Exception {
+        // set max clients
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        doReturn(2).when(svcConfig).getMaxProducersPerTopic();
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+        testMaxProducers();
+    }
+
+    @Test
+    public void testMaxProducersForNamespace() throws Exception {
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+        // set max clients
+        Policies policies = new Policies();
+        policies.max_producers_per_topic = 2;
+        when(pulsar.getConfigurationCache().policiesCache()
+                .get(AdminResource.path(POLICIES, DestinationName.get(successTopicName).getNamespace())))
+                .thenReturn(Optional.of(policies));
+        testMaxProducers();
+    }
+
     @Test
     public void testSubscribeFail() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
@@ -474,14 +489,7 @@ public void testAddRemoveConsumer() throws Exception {
         }
     }
 
-    @Test
     public void testMaxConsumersShared() throws Exception {
-        // set max clients
-        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
-        doReturn(2).when(svcConfig).getMaxConsumersPerSubscription();
-        doReturn(3).when(svcConfig).getMaxConsumersPerTopic();
-        doReturn(svcConfig).when(pulsar).getConfiguration();
-
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
         PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
         PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock);
@@ -545,13 +553,34 @@ public void testMaxConsumersShared() throws Exception {
     }
 
     @Test
-    public void testMaxConsumersFailover() throws Exception {
+    public void testMaxConsumersSharedForBroker() throws Exception {
         // set max clients
         ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
         doReturn(2).when(svcConfig).getMaxConsumersPerSubscription();
         doReturn(3).when(svcConfig).getMaxConsumersPerTopic();
         doReturn(svcConfig).when(pulsar).getConfiguration();
 
+        testMaxConsumersShared();
+    }
+
+    @Test
+    public void testMaxConsumersSharedForNamespace() throws Exception {
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+
+        // set max clients
+        Policies policies = new Policies();
+        policies.max_consumers_per_subscription = 2;
+        policies.max_consumers_per_topic = 3;
+        when(pulsar.getConfigurationCache().policiesCache()
+                .get(AdminResource.path(POLICIES, DestinationName.get(successTopicName).getNamespace())))
+                .thenReturn(Optional.of(policies));
+
+        testMaxConsumersShared();
+    }
+
+    public void testMaxConsumersFailover() throws Exception {
+
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
         PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
         PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock);
@@ -614,6 +643,33 @@ public void testMaxConsumersFailover() throws Exception {
         }
     }
 
+    @Test
+    public void testMaxConsumersFailoverForBroker() throws Exception {
+        // set max clients
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        doReturn(2).when(svcConfig).getMaxConsumersPerSubscription();
+        doReturn(3).when(svcConfig).getMaxConsumersPerTopic();
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+
+        testMaxConsumersFailover();
+    }
+
+    @Test
+    public void testMaxConsumersFailoverForNamespace() throws Exception {
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+
+        // set max clients
+        Policies policies = new Policies();
+        policies.max_consumers_per_subscription = 2;
+        policies.max_consumers_per_topic = 3;
+        when(pulsar.getConfigurationCache().policiesCache()
+                .get(AdminResource.path(POLICIES, DestinationName.get(successTopicName).getNamespace())))
+                .thenReturn(Optional.of(policies));
+
+        testMaxConsumersFailover();
+    }
+
     @Test
     public void testUbsubscribeRaceConditions() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 3c8d3d51e..889ae934d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -917,4 +917,136 @@ void clearNamespaceBundleBacklogForSubscription(String namespace, String bundle,
      * @throws PulsarAdminException
      */
     void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscriptionAuthMode) throws PulsarAdminException;
+
+    /**
+     * Get the maxProducersPerTopic for a namespace.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>0</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    int getMaxProducersPerTopic(String namespace) throws PulsarAdminException;
+
+    /**
+     * Set maxProducersPerTopic for a namespace.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>10</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param maxProducersPerTopic
+     *            maxProducersPerTopic value for a namespace
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setMaxProducersPerTopic(String namespace, int maxProducersPerTopic) throws PulsarAdminException;
+
+    /**
+     * Get the maxProducersPerTopic for a namespace.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>0</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    int getMaxConsumersPerTopic(String namespace) throws PulsarAdminException;
+
+    /**
+     * Set maxConsumersPerTopic for a namespace.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>10</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param maxConsumersPerTopic
+     *            maxConsumersPerTopic value for a namespace
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setMaxConsumersPerTopic(String namespace, int maxConsumersPerTopic) throws PulsarAdminException;
+
+    /**
+     * Get the maxConsumersPerSubscription for a namespace.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>0</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException;
+
+    /**
+     * Set maxConsumersPerSubscription for a namespace.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>10</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param maxConsumersPerSubscription
+     *            maxConsumersPerSubscription value for a namespace
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setMaxConsumersPerSubscription(String namespace, int maxConsumersPerSubscription) throws PulsarAdminException;
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 3f0c7e71c..a01e66b0f 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -534,4 +534,73 @@ public void setEncryptionRequiredStatus(String namespace, boolean encryptionRequ
             throw getApiException(e);
         }
     }
+
+    @Override
+    public int getMaxProducersPerTopic(String namespace) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            return request(
+                    namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("maxProducersPerTopic"))
+                    .get(Integer.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void setMaxProducersPerTopic(String namespace, int maxProducersPerTopic) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("maxProducersPerTopic"))
+                    .post(Entity.entity(maxProducersPerTopic, MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public int getMaxConsumersPerTopic(String namespace) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            return request(
+                    namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("maxConsumersPerTopic"))
+                    .get(Integer.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void setMaxConsumersPerTopic(String namespace, int maxConsumersPerTopic) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("maxConsumersPerTopic"))
+                    .post(Entity.entity(maxConsumersPerTopic, MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            return request(
+                    namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("maxConsumersPerSubscription"))
+                    .get(Integer.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void setMaxConsumersPerSubscription(String namespace, int maxConsumersPerSubscription) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("maxConsumersPerSubscription"))
+                    .post(Entity.entity(maxConsumersPerSubscription, MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 6e84bf629..74ce03be2 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -611,6 +611,87 @@ void run() throws Exception {
         }
     }
 
+    @Parameters(commandDescription = "Get maxProducersPerTopic for a namespace")
+    private class GetMaxProducersPerTopic extends CliCommand {
+        @Parameter(description = "property/cluster/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(admin.namespaces().getMaxProducersPerTopic(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Set maxProducersPerTopic for a namespace")
+    private class SetMaxProducersPerTopic extends CliCommand {
+        @Parameter(description = "property/cluster/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--max-producers-per-topic", "-p" }, description = "maxProducersPerTopic for a namespace", required = true)
+        private int maxProducersPerTopic;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().setMaxProducersPerTopic(namespace, maxProducersPerTopic);
+        }
+    }
+
+    @Parameters(commandDescription = "Get maxConsumersPerTopic for a namespace")
+    private class GetMaxConsumersPerTopic extends CliCommand {
+        @Parameter(description = "property/cluster/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(admin.namespaces().getMaxConsumersPerTopic(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Set maxConsumersPerTopic for a namespace")
+    private class SetMaxConsumersPerTopic extends CliCommand {
+        @Parameter(description = "property/cluster/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--max-consumers-per-topic", "-c" }, description = "maxConsumersPerTopic for a namespace", required = true)
+        private int maxConsumersPerTopic;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().setMaxConsumersPerTopic(namespace, maxConsumersPerTopic);
+        }
+    }
+
+    @Parameters(commandDescription = "Get maxConsumersPerSubscription for a namespace")
+    private class GetMaxConsumersPerSubscription extends CliCommand {
+        @Parameter(description = "property/cluster/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(admin.namespaces().getMaxConsumersPerSubscription(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Set maxConsumersPerSubscription for a namespace")
+    private class SetMaxConsumersPerSubscription extends CliCommand {
+        @Parameter(description = "property/cluster/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--max-consumers-per-subscription", "-c" }, description = "maxConsumersPerSubscription for a namespace", required = true)
+        private int maxConsumersPerSubscription;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().setMaxConsumersPerSubscription(namespace, maxConsumersPerSubscription);
+        }
+    }
+
     private static long validateSizeString(String s) {
         char last = s.charAt(s.length() - 1);
         String subStr = s.substring(0, s.length() - 1);
@@ -710,5 +791,12 @@ public CmdNamespaces(PulsarAdmin admin) {
 
         jcommander.addCommand("set-encryption-required", new SetEncryptionRequired());
         jcommander.addCommand("set-subscription-auth-mode", new SetSubscriptionAuthMode());
+
+        jcommander.addCommand("get-max-producers-per-topic", new GetMaxProducersPerTopic());
+        jcommander.addCommand("set-max-producers-per-topic", new SetMaxProducersPerTopic());
+        jcommander.addCommand("get-max-consumers-per-topic", new GetMaxConsumersPerTopic());
+        jcommander.addCommand("set-max-consumers-per-topic", new SetMaxConsumersPerTopic());
+        jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription());
+        jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription());
     }
 }
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 00f18a977..fe94ccb90 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -359,6 +359,28 @@ void namespaces() throws Exception {
 
         namespaces.run(split("unsubscribe -b 0x80000000_0xffffffff -s my-sub myprop/clust/ns1"));
         verify(mockNamespaces).unsubscribeNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff", "my-sub");
+
+        mockNamespaces = mock(Namespaces.class);
+        when(admin.namespaces()).thenReturn(mockNamespaces);
+        namespaces = new CmdNamespaces(admin);
+
+        namespaces.run(split("get-max-producers-per-topic myprop/clust/ns1"));
+        verify(mockNamespaces).getMaxProducersPerTopic("myprop/clust/ns1");
+
+        namespaces.run(split("set-max-producers-per-topic myprop/clust/ns1 -p 1"));
+        verify(mockNamespaces).setMaxProducersPerTopic("myprop/clust/ns1", 1);
+
+        namespaces.run(split("get-max-consumers-per-topic myprop/clust/ns1"));
+        verify(mockNamespaces).getMaxConsumersPerTopic("myprop/clust/ns1");
+
+        namespaces.run(split("set-max-consumers-per-topic myprop/clust/ns1 -c 2"));
+        verify(mockNamespaces).setMaxConsumersPerTopic("myprop/clust/ns1", 2);
+
+        namespaces.run(split("get-max-consumers-per-subscription myprop/clust/ns1"));
+        verify(mockNamespaces).getMaxConsumersPerSubscription("myprop/clust/ns1");
+
+        namespaces.run(split("set-max-consumers-per-subscription myprop/clust/ns1 -c 3"));
+        verify(mockNamespaces).setMaxConsumersPerSubscription("myprop/clust/ns1", 3);
     }
 
     @Test
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index f3486b859..03a8ce3d9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -50,6 +50,10 @@
     public boolean encryption_required = false;
     public SubscriptionAuthMode subscription_auth_mode = SubscriptionAuthMode.None;
 
+    public int max_producers_per_topic = 0;
+    public int max_consumers_per_topic = 0;
+    public int max_consumers_per_subscription = 0;
+
     @Override
     public boolean equals(Object obj) {
         if (obj instanceof Policies) {
@@ -65,7 +69,10 @@ public boolean equals(Object obj) {
                     && Objects.equals(retention_policies, other.retention_policies)
                     && Objects.equals(encryption_required, other.encryption_required)
                     && Objects.equals(subscription_auth_mode, other.subscription_auth_mode)
-                    && Objects.equals(antiAffinityGroup, other.antiAffinityGroup);
+                    && Objects.equals(antiAffinityGroup, other.antiAffinityGroup)
+                    && max_producers_per_topic == other.max_producers_per_topic
+                    && max_consumers_per_topic == other.max_consumers_per_topic
+                    && max_consumers_per_subscription == other.max_consumers_per_subscription;
         }
 
         return false;
@@ -92,6 +99,9 @@ public String toString() {
                 .add("message_ttl_in_seconds", message_ttl_in_seconds).add("retention_policies", retention_policies)
                 .add("deleted", deleted)
                 .add("encryption_required", encryption_required)
-                .add("subscription_auth_mode", subscription_auth_mode).toString();
+                .add("subscription_auth_mode", subscription_auth_mode)
+                .add("max_producers_per_topic", max_producers_per_topic)
+                .add("max_consumers_per_topic", max_consumers_per_topic)
+                .add("max_consumers_per_subscription", max_consumers_per_topic).toString();
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services