You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/26 04:58:19 UTC
[incubator-pulsar] branch master updated: Add namespace policy for
max clients (#1255)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cd6f539 Add namespace policy for max clients (#1255)
cd6f539 is described below
commit cd6f5396853f984e6fe60dad95b635cbdc7143dc
Author: yush1ga <y....@gmail.com>
AuthorDate: Mon Feb 26 13:58:17 2018 +0900
Add namespace policy for max clients (#1255)
* Add namespace policy for max clients
* Addressed PR comments
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 124 +++++++++++++++++++
.../apache/pulsar/broker/admin/v1/Namespaces.java | 73 ++++++++++++
.../apache/pulsar/broker/admin/v2/Namespaces.java | 72 +++++++++++
...onPersistentDispatcherSingleActiveConsumer.java | 28 ++++-
.../service/nonpersistent/NonPersistentTopic.java | 12 +-
.../PersistentDispatcherMultipleConsumers.java | 28 ++++-
.../PersistentDispatcherSingleActiveConsumer.java | 29 ++++-
.../broker/service/persistent/PersistentTopic.java | 12 +-
.../pulsar/broker/service/PersistentTopicTest.java | 86 +++++++++++---
.../org/apache/pulsar/client/admin/Namespaces.java | 132 +++++++++++++++++++++
.../client/admin/internal/NamespacesImpl.java | 69 +++++++++++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 88 ++++++++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 22 ++++
.../pulsar/common/policies/data/Policies.java | 14 ++-
14 files changed, 764 insertions(+), 25 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 902a56f..5ecca24 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1242,5 +1242,129 @@ public abstract class NamespacesBase extends AdminResource {
}
}
+
+ protected int internalGetMaxProducersPerTopic() {
+ validateAdminAccessOnProperty(namespaceName.getProperty());
+ return getNamespacePolicies(namespaceName).max_producers_per_topic;
+ }
+
+ protected void internalSetMaxProducersPerTopic(int maxProducersPerTopic) {
+ validateSuperUserAccess();
+ validatePoliciesReadOnlyAccess();
+
+ try {
+ Stat nodeStat = new Stat();
+ final String path = path(POLICIES, namespaceName.toString());
+ byte[] content = globalZk().getData(path, null, nodeStat);
+ Policies policies = jsonMapper().readValue(content, Policies.class);
+ if (maxProducersPerTopic < 0) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "maxProducersPerTopic must be 0 or more");
+ }
+ policies.max_producers_per_topic = maxProducersPerTopic;
+ globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
+ policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+ log.info("[{}] Successfully updated maxProducersPerTopic configuration: namespace={}, value={}", clientAppId(),
+ namespaceName, policies.max_producers_per_topic);
+
+ } catch (KeeperException.NoNodeException e) {
+ log.warn("[{}] Failed to update maxProducersPerTopic configuration for namespace {}: does not exist", clientAppId(),
+ namespaceName);
+ throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+ } catch (KeeperException.BadVersionException e) {
+ log.warn("[{}] Failed to update maxProducersPerTopic configuration for namespace {}: concurrent modification",
+ clientAppId(), namespaceName);
+ throw new RestException(Status.CONFLICT, "Concurrent modification");
+ } catch (RestException pfe) {
+ throw pfe;
+ } catch (Exception e) {
+ log.error("[{}] Failed to update maxProducersPerTopic configuration for namespace {}", clientAppId(), namespaceName,
+ e);
+ throw new RestException(e);
+ }
+ }
+
+ protected int internalGetMaxConsumersPerTopic() {
+ validateAdminAccessOnProperty(namespaceName.getProperty());
+ return getNamespacePolicies(namespaceName).max_consumers_per_topic;
+ }
+
+ protected void internalSetMaxConsumersPerTopic(int maxConsumersPerTopic) {
+ validateSuperUserAccess();
+ validatePoliciesReadOnlyAccess();
+
+ try {
+ Stat nodeStat = new Stat();
+ final String path = path(POLICIES, namespaceName.toString());
+ byte[] content = globalZk().getData(path, null, nodeStat);
+ Policies policies = jsonMapper().readValue(content, Policies.class);
+ if (maxConsumersPerTopic < 0) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "maxConsumersPerTopic must be 0 or more");
+ }
+ policies.max_consumers_per_topic = maxConsumersPerTopic;
+ globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
+ policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+ log.info("[{}] Successfully updated maxConsumersPerTopic configuration: namespace={}, value={}", clientAppId(),
+ namespaceName, policies.max_consumers_per_topic);
+
+ } catch (KeeperException.NoNodeException e) {
+ log.warn("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}: does not exist", clientAppId(),
+ namespaceName);
+ throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+ } catch (KeeperException.BadVersionException e) {
+ log.warn("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}: concurrent modification",
+ clientAppId(), namespaceName);
+ throw new RestException(Status.CONFLICT, "Concurrent modification");
+ } catch (RestException pfe) {
+ throw pfe;
+ } catch (Exception e) {
+ log.error("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}", clientAppId(), namespaceName,
+ e);
+ throw new RestException(e);
+ }
+ }
+
+ protected int internalGetMaxConsumersPerSubscription() {
+ validateAdminAccessOnProperty(namespaceName.getProperty());
+ return getNamespacePolicies(namespaceName).max_consumers_per_subscription;
+ }
+
+ protected void internalSetMaxConsumersPerSubscription(int maxConsumersPerSubscription) {
+ validateSuperUserAccess();
+ validatePoliciesReadOnlyAccess();
+
+ try {
+ Stat nodeStat = new Stat();
+ final String path = path(POLICIES, namespaceName.toString());
+ byte[] content = globalZk().getData(path, null, nodeStat);
+ Policies policies = jsonMapper().readValue(content, Policies.class);
+ if (maxConsumersPerSubscription < 0) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "maxConsumersPerSubscription must be 0 or more");
+ }
+ policies.max_consumers_per_subscription = maxConsumersPerSubscription;
+ globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
+ policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+ log.info("[{}] Successfully updated maxConsumersPerSubscription configuration: namespace={}, value={}", clientAppId(),
+ namespaceName, policies.max_consumers_per_subscription);
+
+ } catch (KeeperException.NoNodeException e) {
+ log.warn("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}: does not exist", clientAppId(),
+ namespaceName);
+ throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+ } catch (KeeperException.BadVersionException e) {
+ log.warn("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}: concurrent modification",
+ clientAppId(), namespaceName);
+ throw new RestException(Status.CONFLICT, "Concurrent modification");
+ } catch (RestException pfe) {
+ throw pfe;
+ } catch (Exception e) {
+ log.error("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}", clientAppId(), namespaceName,
+ e);
+ throw new RestException(e);
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index ddd3154..e3f5e43 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -708,5 +708,78 @@ public class Namespaces extends NamespacesBase {
internalModifyEncryptionRequired(encryptionRequired);
}
+ @GET
+ @Path("/{property}/{cluster}/{namespace}/maxProducersPerTopic")
+ @ApiOperation(value = "Get maxProducersPerTopic config on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist") })
+ public int getMaxProducersPerTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(property, cluster, namespace);
+ return internalGetMaxProducersPerTopic();
+ }
+
+ @POST
+ @Path("/{property}/{cluster}/{namespace}/maxProducersPerTopic")
+ @ApiOperation(value = " Set maxProducersPerTopic configuration on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification"),
+ @ApiResponse(code = 412, message = "maxProducersPerTopic value is not valid") })
+ public void setMaxProducersPerTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace, int maxProducersPerTopic) {
+ validateNamespaceName(property, cluster, namespace);
+ internalSetMaxProducersPerTopic(maxProducersPerTopic);
+ }
+
+ @GET
+ @Path("/{property}/{cluster}/{namespace}/maxConsumersPerTopic")
+ @ApiOperation(value = "Get maxConsumersPerTopic config on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist") })
+ public int getMaxConsumersPerTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(property, cluster, namespace);
+ return internalGetMaxConsumersPerTopic();
+ }
+
+ @POST
+ @Path("/{property}/{cluster}/{namespace}/maxConsumersPerTopic")
+ @ApiOperation(value = " Set maxConsumersPerTopic configuration on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification"),
+ @ApiResponse(code = 412, message = "maxConsumersPerTopic value is not valid") })
+ public void setMaxConsumersPerTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace, int maxConsumersPerTopic) {
+ validateNamespaceName(property, cluster, namespace);
+ internalSetMaxConsumersPerTopic(maxConsumersPerTopic);
+ }
+
+ @GET
+ @Path("/{property}/{cluster}/{namespace}/maxConsumersPerSubscription")
+ @ApiOperation(value = "Get maxConsumersPerSubscription config on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist") })
+ public int getMaxConsumersPerSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(property, cluster, namespace);
+ return internalGetMaxConsumersPerSubscription();
+ }
+
+ @POST
+ @Path("/{property}/{cluster}/{namespace}/maxConsumersPerSubscription")
+ @ApiOperation(value = " Set maxConsumersPerSubscription configuration on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification"),
+ @ApiResponse(code = 412, message = "maxConsumersPerSubscription value is not valid") })
+ public void setMaxConsumersPerSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace, int maxConsumersPerSubscription) {
+ validateNamespaceName(property, cluster, namespace);
+ internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription);
+ }
+
+
private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 873d761..c984e76 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -504,6 +504,78 @@ public class Namespaces extends NamespacesBase {
internalModifyEncryptionRequired(encryptionRequired);
}
+ @GET
+ @Path("/{property}/{namespace}/maxProducersPerTopic")
+ @ApiOperation(value = "Get maxProducersPerTopic config on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist") })
+ public int getMaxProducersPerTopic(@PathParam("property") String property,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(property, namespace);
+ return internalGetMaxProducersPerTopic();
+ }
+
+ @POST
+ @Path("/{property}/{namespace}/maxProducersPerTopic")
+ @ApiOperation(value = " Set maxProducersPerTopic configuration on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification"),
+ @ApiResponse(code = 412, message = "maxProducersPerTopic value is not valid") })
+ public void setMaxProducersPerTopic(@PathParam("property") String property, @PathParam("namespace") String namespace,
+ int maxProducersPerTopic) {
+ validateNamespaceName(property, namespace);
+ internalSetMaxProducersPerTopic(maxProducersPerTopic);
+ }
+
+ @GET
+ @Path("/{property}/{namespace}/maxConsumersPerTopic")
+ @ApiOperation(value = "Get maxConsumersPerTopic config on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist") })
+ public int getMaxConsumersPerTopic(@PathParam("property") String property,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(property, namespace);
+ return internalGetMaxConsumersPerTopic();
+ }
+
+ @POST
+ @Path("/{property}/{namespace}/maxConsumersPerTopic")
+ @ApiOperation(value = " Set maxConsumersPerTopic configuration on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification"),
+ @ApiResponse(code = 412, message = "maxConsumersPerTopic value is not valid") })
+ public void setMaxConsumersPerTopic(@PathParam("property") String property, @PathParam("namespace") String namespace,
+ int maxConsumersPerTopic) {
+ validateNamespaceName(property, namespace);
+ internalSetMaxConsumersPerTopic(maxConsumersPerTopic);
+ }
+
+ @GET
+ @Path("/{property}/{namespace}/maxConsumersPerSubscription")
+ @ApiOperation(value = "Get maxConsumersPerSubscription config on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist") })
+ public int getMaxConsumersPerSubscription(@PathParam("property") String property,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(property, namespace);
+ return internalGetMaxConsumersPerSubscription();
+ }
+
+ @POST
+ @Path("/{property}/{namespace}/maxConsumersPerSubscription")
+ @ApiOperation(value = " Set maxConsumersPerSubscription configuration on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification"),
+ @ApiResponse(code = 412, message = "maxConsumersPerSubscription value is not valid") })
+ public void setMaxConsumersPerSubscription(@PathParam("property") String property, @PathParam("namespace") String namespace,
+ int maxConsumersPerSubscription) {
+ validateNamespaceName(property, namespace);
+ internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription);
+ }
+
private Policies getDefaultPolicesIfNull(Policies policies) {
if (policies != null) {
return policies;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 5b68c67..092dc36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.service.Consumer.getBatchSizeforEntry;
import java.util.List;
@@ -25,10 +26,13 @@ import java.util.List;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.policies.data.Policies;
public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer implements NonPersistentDispatcher {
@@ -63,7 +67,17 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
}
protected boolean isConsumersExceededOnTopic() {
- final int maxConsumersPerTopic = serviceConfig.getMaxConsumersPerTopic();
+ Policies policies;
+ try {
+ policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+ .orElseGet(() -> new Policies());
+ } catch (Exception e) {
+ policies = new Policies();
+ }
+ final int maxConsumersPerTopic = policies.max_consumers_per_topic > 0 ?
+ policies.max_consumers_per_topic :
+ serviceConfig.getMaxConsumersPerTopic();
if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= topic.getNumberOfConsumers()) {
return true;
}
@@ -71,7 +85,17 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
}
protected boolean isConsumersExceededOnSubscription() {
- final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription();
+ Policies policies;
+ try {
+ policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+ .orElseGet(() -> new Policies());
+ } catch (Exception e) {
+ policies = new Policies();
+ }
+ final int maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
+ policies.max_consumers_per_subscription :
+ serviceConfig.getMaxConsumersPerSubscription();
if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) {
return true;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index a03ac6c..d8510df 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -265,7 +265,17 @@ public class NonPersistentTopic implements Topic {
}
private boolean isProducersExceeded() {
- final int maxProducers = brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
+ Policies policies;
+ try {
+ policies = brokerService.pulsar().getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES, DestinationName.get(topic).getNamespace()))
+ .orElseGet(() -> new Policies());
+ } catch (Exception e) {
+ policies = new Policies();
+ }
+ final int maxProducers = policies.max_producers_per_topic > 0 ?
+ policies.max_producers_per_topic :
+ brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
if (maxProducers > 0 && maxProducers <= producers.size()) {
return true;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 5189465..2e59811 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.persistent;
import static java.util.stream.Collectors.toSet;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsExcep
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
@@ -44,6 +46,8 @@ import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
import org.apache.pulsar.utils.CopyOnWriteArrayList;
@@ -132,7 +136,17 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu
}
private boolean isConsumersExceededOnTopic() {
- final int maxConsumersPerTopic = serviceConfig.getMaxConsumersPerTopic();
+ Policies policies;
+ try {
+ policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES, DestinationName.get(topic.getName()).getNamespace()))
+ .orElseGet(() -> new Policies());
+ } catch (Exception e) {
+ policies = new Policies();
+ }
+ final int maxConsumersPerTopic = policies.max_consumers_per_topic > 0 ?
+ policies.max_consumers_per_topic :
+ serviceConfig.getMaxConsumersPerTopic();
if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= topic.getNumberOfConsumers()) {
return true;
}
@@ -140,7 +154,17 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu
}
private boolean isConsumersExceededOnSubscription() {
- final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription();
+ Policies policies;
+ try {
+ policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES, DestinationName.get(topic.getName()).getNamespace()))
+ .orElseGet(() -> new Policies());
+ } catch (Exception e) {
+ policies = new Policies();
+ }
+ final int maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
+ policies.max_consumers_per_subscription :
+ serviceConfig.getMaxConsumersPerSubscription();
if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerList.size()) {
return true;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index a26d862..bd197e8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -19,9 +19,11 @@
package org.apache.pulsar.broker.service.persistent;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledFuture;
@@ -33,12 +35,15 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadE
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,7 +119,17 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
}
protected boolean isConsumersExceededOnTopic() {
- final int maxConsumersPerTopic = serviceConfig.getMaxConsumersPerTopic();
+ Policies policies;
+ try {
+ policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+ .orElseGet(() -> new Policies());
+ } catch (Exception e) {
+ policies = new Policies();
+ }
+ final int maxConsumersPerTopic = policies.max_consumers_per_topic > 0 ?
+ policies.max_consumers_per_topic :
+ serviceConfig.getMaxConsumersPerTopic();
if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= topic.getNumberOfConsumers()) {
return true;
}
@@ -122,7 +137,17 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
}
protected boolean isConsumersExceededOnSubscription() {
- final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription();
+ Policies policies;
+ try {
+ policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+ .orElseGet(() -> new Policies());
+ } catch (Exception e) {
+ policies = new Policies();
+ }
+ final int maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
+ policies.max_consumers_per_subscription :
+ serviceConfig.getMaxConsumersPerSubscription();
if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) {
return true;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6e6b5d8..4d5d974 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -348,7 +348,17 @@ public class PersistentTopic implements Topic, AddEntryCallback {
}
private boolean isProducersExceeded() {
- final int maxProducers = brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
+ Policies policies;
+ try {
+ policies = brokerService.pulsar().getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES, DestinationName.get(topic).getNamespace()))
+ .orElseGet(() -> new Policies());
+ } catch (Exception e) {
+ policies = new Policies();
+ }
+ final int maxProducers = policies.max_producers_per_topic > 0 ?
+ policies.max_producers_per_topic :
+ brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
if (maxProducers > 0 && maxProducers <= producers.size()) {
return true;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index b19b053..d0277ef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -365,15 +365,8 @@ public class PersistentTopicTest {
topic.removeProducer(producer); /* noop */
}
- @Test
public void testMaxProducers() throws Exception {
- // set max clients
- ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
- doReturn(2).when(svcConfig).getMaxProducersPerTopic();
- doReturn(svcConfig).when(pulsar).getConfiguration();
-
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
-
String role = "appid1";
// 1. add producer1
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role, false, null);
@@ -396,6 +389,28 @@ public class PersistentTopicTest {
}
@Test
+ public void testMaxProducersForBroker() throws Exception {
+ // set max clients
+ ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+ doReturn(2).when(svcConfig).getMaxProducersPerTopic();
+ doReturn(svcConfig).when(pulsar).getConfiguration();
+ testMaxProducers();
+ }
+
+ @Test
+ public void testMaxProducersForNamespace() throws Exception {
+ ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+ doReturn(svcConfig).when(pulsar).getConfiguration();
+ // set max clients
+ Policies policies = new Policies();
+ policies.max_producers_per_topic = 2;
+ when(pulsar.getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES, DestinationName.get(successTopicName).getNamespace())))
+ .thenReturn(Optional.of(policies));
+ testMaxProducers();
+ }
+
+ @Test
public void testSubscribeFail() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
@@ -477,14 +492,7 @@ public class PersistentTopicTest {
}
}
- @Test
public void testMaxConsumersShared() throws Exception {
- // set max clients
- ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
- doReturn(2).when(svcConfig).getMaxConsumersPerSubscription();
- doReturn(3).when(svcConfig).getMaxConsumersPerTopic();
- doReturn(svcConfig).when(pulsar).getConfiguration();
-
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock);
@@ -548,13 +556,34 @@ public class PersistentTopicTest {
}
@Test
- public void testMaxConsumersFailover() throws Exception {
+ public void testMaxConsumersSharedForBroker() throws Exception {
// set max clients
ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
doReturn(2).when(svcConfig).getMaxConsumersPerSubscription();
doReturn(3).when(svcConfig).getMaxConsumersPerTopic();
doReturn(svcConfig).when(pulsar).getConfiguration();
+ testMaxConsumersShared();
+ }
+
+ @Test
+ public void testMaxConsumersSharedForNamespace() throws Exception {
+ ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+ doReturn(svcConfig).when(pulsar).getConfiguration();
+
+ // set max clients
+ Policies policies = new Policies();
+ policies.max_consumers_per_subscription = 2;
+ policies.max_consumers_per_topic = 3;
+ when(pulsar.getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES, DestinationName.get(successTopicName).getNamespace())))
+ .thenReturn(Optional.of(policies));
+
+ testMaxConsumersShared();
+ }
+
+ public void testMaxConsumersFailover() throws Exception {
+
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock);
@@ -618,6 +647,33 @@ public class PersistentTopicTest {
}
@Test
+ public void testMaxConsumersFailoverForBroker() throws Exception {
+ // set max clients
+ ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+ doReturn(2).when(svcConfig).getMaxConsumersPerSubscription();
+ doReturn(3).when(svcConfig).getMaxConsumersPerTopic();
+ doReturn(svcConfig).when(pulsar).getConfiguration();
+
+ testMaxConsumersFailover();
+ }
+
+ @Test
+ public void testMaxConsumersFailoverForNamespace() throws Exception {
+ ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+ doReturn(svcConfig).when(pulsar).getConfiguration();
+
+ // set max clients
+ Policies policies = new Policies();
+ policies.max_consumers_per_subscription = 2;
+ policies.max_consumers_per_topic = 3;
+ when(pulsar.getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES, DestinationName.get(successTopicName).getNamespace())))
+ .thenReturn(Optional.of(policies));
+
+ testMaxConsumersFailover();
+ }
+
+ @Test
public void testUbsubscribeRaceConditions() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 65ce200..c8d3204 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -917,4 +917,136 @@ public interface Namespaces {
* @throws PulsarAdminException
*/
void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscriptionAuthMode) throws PulsarAdminException;
+
+ /**
+ * Get the maxProducersPerTopic for a namespace.
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>0</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ int getMaxProducersPerTopic(String namespace) throws PulsarAdminException;
+
+ /**
+ * Set maxProducersPerTopic for a namespace.
+ * <p>
+ * Request example:
+ *
+ * <pre>
+ * <code>10</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param maxProducersPerTopic
+ * maxProducersPerTopic value for a namespace
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setMaxProducersPerTopic(String namespace, int maxProducersPerTopic) throws PulsarAdminException;
+
+ /**
+ * Get the maxProducersPerTopic for a namespace.
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>0</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ int getMaxConsumersPerTopic(String namespace) throws PulsarAdminException;
+
+ /**
+ * Set maxConsumersPerTopic for a namespace.
+ * <p>
+ * Request example:
+ *
+ * <pre>
+ * <code>10</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param maxConsumersPerTopic
+ * maxConsumersPerTopic value for a namespace
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setMaxConsumersPerTopic(String namespace, int maxConsumersPerTopic) throws PulsarAdminException;
+
+ /**
+ * Get the maxConsumersPerSubscription for a namespace.
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>0</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException;
+
+ /**
+ * Set maxConsumersPerSubscription for a namespace.
+ * <p>
+ * Request example:
+ *
+ * <pre>
+ * <code>10</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param maxConsumersPerSubscription
+ * maxConsumersPerSubscription value for a namespace
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setMaxConsumersPerSubscription(String namespace, int maxConsumersPerSubscription) throws PulsarAdminException;
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 6db9ab0..6d3ff3d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -534,4 +534,73 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
throw getApiException(e);
}
}
+
+ @Override
+ public int getMaxProducersPerTopic(String namespace) throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ return request(
+ namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("maxProducersPerTopic"))
+ .get(Integer.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void setMaxProducersPerTopic(String namespace, int maxProducersPerTopic) throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("maxProducersPerTopic"))
+ .post(Entity.entity(maxProducersPerTopic, MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public int getMaxConsumersPerTopic(String namespace) throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ return request(
+ namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("maxConsumersPerTopic"))
+ .get(Integer.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void setMaxConsumersPerTopic(String namespace, int maxConsumersPerTopic) throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("maxConsumersPerTopic"))
+ .post(Entity.entity(maxConsumersPerTopic, MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ return request(
+ namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("maxConsumersPerSubscription"))
+ .get(Integer.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void setMaxConsumersPerSubscription(String namespace, int maxConsumersPerSubscription) throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("maxConsumersPerSubscription"))
+ .post(Entity.entity(maxConsumersPerSubscription, MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 0f93708..9d93bca 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -611,6 +611,87 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get maxProducersPerTopic for a namespace")
+ private class GetMaxProducersPerTopic extends CliCommand {
+ @Parameter(description = "property/cluster/namespace\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ print(admin.namespaces().getMaxProducersPerTopic(namespace));
+ }
+ }
+
+ @Parameters(commandDescription = "Set maxProducersPerTopic for a namespace")
+ private class SetMaxProducersPerTopic extends CliCommand {
+ @Parameter(description = "property/cluster/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--max-producers-per-topic", "-p" }, description = "maxProducersPerTopic for a namespace", required = true)
+ private int maxProducersPerTopic;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().setMaxProducersPerTopic(namespace, maxProducersPerTopic);
+ }
+ }
+
+ @Parameters(commandDescription = "Get maxConsumersPerTopic for a namespace")
+ private class GetMaxConsumersPerTopic extends CliCommand {
+ @Parameter(description = "property/cluster/namespace\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ print(admin.namespaces().getMaxConsumersPerTopic(namespace));
+ }
+ }
+
+ @Parameters(commandDescription = "Set maxConsumersPerTopic for a namespace")
+ private class SetMaxConsumersPerTopic extends CliCommand {
+ @Parameter(description = "property/cluster/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--max-consumers-per-topic", "-c" }, description = "maxConsumersPerTopic for a namespace", required = true)
+ private int maxConsumersPerTopic;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().setMaxConsumersPerTopic(namespace, maxConsumersPerTopic);
+ }
+ }
+
+ @Parameters(commandDescription = "Get maxConsumersPerSubscription for a namespace")
+ private class GetMaxConsumersPerSubscription extends CliCommand {
+ @Parameter(description = "property/cluster/namespace\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ print(admin.namespaces().getMaxConsumersPerSubscription(namespace));
+ }
+ }
+
+ @Parameters(commandDescription = "Set maxConsumersPerSubscription for a namespace")
+ private class SetMaxConsumersPerSubscription extends CliCommand {
+ @Parameter(description = "property/cluster/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--max-consumers-per-subscription", "-c" }, description = "maxConsumersPerSubscription for a namespace", required = true)
+ private int maxConsumersPerSubscription;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().setMaxConsumersPerSubscription(namespace, maxConsumersPerSubscription);
+ }
+ }
+
private static long validateSizeString(String s) {
char last = s.charAt(s.length() - 1);
String subStr = s.substring(0, s.length() - 1);
@@ -711,5 +792,12 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("set-encryption-required", new SetEncryptionRequired());
jcommander.addCommand("set-subscription-auth-mode", new SetSubscriptionAuthMode());
+
+ jcommander.addCommand("get-max-producers-per-topic", new GetMaxProducersPerTopic());
+ jcommander.addCommand("set-max-producers-per-topic", new SetMaxProducersPerTopic());
+ jcommander.addCommand("get-max-consumers-per-topic", new GetMaxConsumersPerTopic());
+ jcommander.addCommand("set-max-consumers-per-topic", new SetMaxConsumersPerTopic());
+ jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription());
+ jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription());
}
}
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index cf90477..46fa7f7 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -365,6 +365,28 @@ public class PulsarAdminToolTest {
namespaces.run(split("unsubscribe -b 0x80000000_0xffffffff -s my-sub myprop/clust/ns1"));
verify(mockNamespaces).unsubscribeNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff", "my-sub");
+
+ mockNamespaces = mock(Namespaces.class);
+ when(admin.namespaces()).thenReturn(mockNamespaces);
+ namespaces = new CmdNamespaces(admin);
+
+ namespaces.run(split("get-max-producers-per-topic myprop/clust/ns1"));
+ verify(mockNamespaces).getMaxProducersPerTopic("myprop/clust/ns1");
+
+ namespaces.run(split("set-max-producers-per-topic myprop/clust/ns1 -p 1"));
+ verify(mockNamespaces).setMaxProducersPerTopic("myprop/clust/ns1", 1);
+
+ namespaces.run(split("get-max-consumers-per-topic myprop/clust/ns1"));
+ verify(mockNamespaces).getMaxConsumersPerTopic("myprop/clust/ns1");
+
+ namespaces.run(split("set-max-consumers-per-topic myprop/clust/ns1 -c 2"));
+ verify(mockNamespaces).setMaxConsumersPerTopic("myprop/clust/ns1", 2);
+
+ namespaces.run(split("get-max-consumers-per-subscription myprop/clust/ns1"));
+ verify(mockNamespaces).getMaxConsumersPerSubscription("myprop/clust/ns1");
+
+ namespaces.run(split("set-max-consumers-per-subscription myprop/clust/ns1 -c 3"));
+ verify(mockNamespaces).setMaxConsumersPerSubscription("myprop/clust/ns1", 3);
}
@Test
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index f3486b8..03a8ce3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -50,6 +50,10 @@ public class Policies {
public boolean encryption_required = false;
public SubscriptionAuthMode subscription_auth_mode = SubscriptionAuthMode.None;
+ public int max_producers_per_topic = 0;
+ public int max_consumers_per_topic = 0;
+ public int max_consumers_per_subscription = 0;
+
@Override
public boolean equals(Object obj) {
if (obj instanceof Policies) {
@@ -65,7 +69,10 @@ public class Policies {
&& Objects.equals(retention_policies, other.retention_policies)
&& Objects.equals(encryption_required, other.encryption_required)
&& Objects.equals(subscription_auth_mode, other.subscription_auth_mode)
- && Objects.equals(antiAffinityGroup, other.antiAffinityGroup);
+ && Objects.equals(antiAffinityGroup, other.antiAffinityGroup)
+ && max_producers_per_topic == other.max_producers_per_topic
+ && max_consumers_per_topic == other.max_consumers_per_topic
+ && max_consumers_per_subscription == other.max_consumers_per_subscription;
}
return false;
@@ -92,6 +99,9 @@ public class Policies {
.add("message_ttl_in_seconds", message_ttl_in_seconds).add("retention_policies", retention_policies)
.add("deleted", deleted)
.add("encryption_required", encryption_required)
- .add("subscription_auth_mode", subscription_auth_mode).toString();
+ .add("subscription_auth_mode", subscription_auth_mode)
+ .add("max_producers_per_topic", max_producers_per_topic)
+ .add("max_consumers_per_topic", max_consumers_per_topic)
+ .add("max_consumers_per_subscription", max_consumers_per_topic).toString();
}
}
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.