You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/09/11 12:48:07 UTC
[pulsar] branch master updated: support max consumers per
subscription on topic level (#8003)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e4b12df support max consumers per subscription on topic level (#8003)
e4b12df is described below
commit e4b12df27c1c8edd44db18e1a6f433393cc46977
Author: hangc0276 <ha...@163.com>
AuthorDate: Fri Sep 11 20:47:44 2020 +0800
support max consumers per subscription on topic level (#8003)
### Modifications
Support set max consumers per subscription on topic level.
Support get max consumers per subscription on topic level.
Support remove max consumers per subscription on topic level.
### Verifying this change
This change added tests and can be verified as follows:
- test set topic max consumers per subscription
- test remove topic max consumers per subscription
- test get topic topic max consumers per subscription
- test disabled topic max consumers per subscription
---
.../broker/admin/impl/PersistentTopicsBase.java | 42 +++++++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 87 +++++++++++++++++++++
.../pulsar/broker/service/BrokerService.java | 32 ++++++++
...onPersistentDispatcherSingleActiveConsumer.java | 35 ++++++---
.../PersistentDispatcherMultipleConsumers.java | 32 +++++---
.../PersistentDispatcherSingleActiveConsumer.java | 32 +++++---
.../broker/admin/TopicPoliciesDisableTest.java | 27 +++++++
.../pulsar/broker/admin/TopicPoliciesTest.java | 40 +++++++++-
.../org/apache/pulsar/client/admin/Topics.java | 88 ++++++++++++++++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 78 +++++++++++++++++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 51 ++++++++++++-
11 files changed, 510 insertions(+), 34 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 085ad12..9a81e9a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3260,6 +3260,48 @@ public class PersistentTopicsBase extends AdminResource {
}
+ protected Optional<Integer> internalGetMaxConsumersPerSubscription() {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ return getTopicPolicies(topicName).map(TopicPolicies::getMaxConsumersPerSubscription);
+ }
+
+ protected CompletableFuture<Void> internalSetMaxConsumersPerSubscription(Integer maxConsumersPerSubscription) {
+ if (maxConsumersPerSubscription != null && maxConsumersPerSubscription < 0) {
+ throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for maxConsumersPerSubscription");
+ }
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+
+ TopicPolicies topicPolicies = getTopicPolicies(topicName)
+ .orElseGet(TopicPolicies::new);
+ topicPolicies.setMaxConsumersPerSubscription(maxConsumersPerSubscription);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ }
+
+ protected CompletableFuture<Void> internalRemoveMaxConsumersPerSubscription() {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
+ if (!topicPolicies.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ topicPolicies.get().setMaxConsumersPerSubscription(null);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+ }
+
protected Optional<Long> internalGetCompactionThreshold() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index cc59ec4..68a12a5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2174,6 +2174,93 @@ public class PersistentTopics extends PersistentTopicsBase {
}
@GET
+ @Path("/{tenant}/{namespace}/{topic}/maxConsumersPerSubscription")
+ @ApiOperation(value = "Get max consumers per subscription configuration for specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void getMaxConsumersPerSubscription(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ try {
+ Optional<Integer> maxConsumersPerSubscription = internalGetMaxConsumersPerSubscription();
+ if (!maxConsumersPerSubscription.isPresent()) {
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ asyncResponse.resume(maxConsumersPerSubscription.get());
+ }
+ } catch (RestException e) {
+ asyncResponse.resume(e);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/maxConsumersPerSubscription")
+ @ApiOperation(value = "Set max consumers per subscription configuration for specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void setMaxConsumersPerSubscription(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Dispatch rate for the specified topic") int maxConsumersPerSubscription) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription).whenComplete((r, ex) -> {
+ if (ex instanceof RestException) {
+ log.error("Failed to set topic {} max consumers per subscription ", topicName.getLocalName(), ex);
+ asyncResponse.resume(ex);
+ } else if (ex != null) {
+ log.error("Failed to set topic max consumers per subscription");
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ try {
+ log.info("[{}] Successfully set topic max consumers per subscription: tenant={}, namespace={}, topic={}, maxConsumersPerSubscription={}",
+ clientAppId(),
+ tenant,
+ namespace,
+ topicName.getLocalName(),
+ jsonMapper().writeValueAsString(maxConsumersPerSubscription));
+ } catch (JsonProcessingException ignore) {}
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/maxConsumersPerSubscription")
+ @ApiOperation(value = "Remove max consumers per subscription configuration for specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void removeMaxConsumersPerSubscription(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalRemoveMaxConsumersPerSubscription().whenComplete((r, ex) -> {
+ if (ex != null) {
+ log.error("Failed to remove topic {} max consuners per subscription", topicName.getLocalName(), ex);
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ log.info("[{}] Successfully remove topic max consumers per subscription: tenant={}, namespace={}, topic={}",
+ clientAppId(),
+ tenant,
+ namespace,
+ topicName.getLocalName());
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
+ @GET
@Path("/{tenant}/{namespace}/{topic}/publishRate")
@ApiOperation(value = "Get publish rate configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1980419..e633c88 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import javax.ws.rs.core.Response;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.apache.commons.collections.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -149,6 +150,7 @@ import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -2361,6 +2363,36 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
return SystemTopicClient.isSystemTopic(TopicName.get(topic));
}
+ /**
+ * Get {@link TopicPolicies} for this topic.
+ * @param topicName
+ * @return TopicPolicies is exist else return null.
+ */
+ public TopicPolicies getTopicPolicies(TopicName topicName) {
+ TopicName cloneTopicName = topicName;
+ if (topicName.isPartitioned()) {
+ cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
+ }
+ try {
+ checkTopicLevelPolicyEnable();
+ return pulsar.getTopicPoliciesService().getTopicPolicies(cloneTopicName);
+ } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+ log.warn("Topic {} policies cache have not init.", topicName.getPartitionedTopicName());
+ return null;
+ } catch (RestException | NullPointerException e) {
+ log.warn("Topic level policies are not enabled. " +
+ "Please refer to systemTopicEnabled and topicLevelPoliciesEnabled on broker.conf");
+ return null;
+ }
+ }
+
+ private void checkTopicLevelPolicyEnable() {
+ if (!pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+ throw new RestException(Response.Status.METHOD_NOT_ALLOWED,
+ "Topic level policies is disabled, to enable the topic level policy and retry.");
+ }
+ }
+
public void setInterceptor(BrokerInterceptor interceptor) {
this.interceptor = interceptor;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 8c642bf..7063990 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import java.util.List;
@@ -32,12 +34,14 @@ import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
+@Slf4j
public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer implements NonPersistentDispatcher {
private final NonPersistentTopic topic;
@@ -77,21 +81,32 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
}
protected boolean isConsumersExceededOnSubscription() {
- Policies policies;
+ Policies policies = null;
+ Integer maxConsumersPerSubscription = null;
try {
- // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
- policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
- .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
-
- if (policies == null) {
- policies = new Policies();
+ maxConsumersPerSubscription = Optional.ofNullable(topic.getBrokerService()
+ .getTopicPolicies(TopicName.get(topicName)))
+ .map(TopicPolicies::getMaxConsumersPerSubscription)
+ .orElse(null);
+ if (maxConsumersPerSubscription == null) {
+ // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
+ policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+ .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
+
+ if (policies == null) {
+ policies = new Policies();
+ }
}
} catch (Exception e) {
policies = new Policies();
}
- final int maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
- policies.max_consumers_per_subscription :
- serviceConfig.getMaxConsumersPerSubscription();
+
+ if (maxConsumersPerSubscription == null) {
+ maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
+ policies.max_consumers_per_subscription :
+ serviceConfig.getMaxConsumersPerSubscription();
+ }
+
if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) {
return true;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 5a453e1..a053681 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -43,7 +43,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadE
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.logging.log4j.util.Strings;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
@@ -65,6 +64,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
import org.apache.pulsar.common.util.collections.LongPairSet;
@@ -154,20 +154,32 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
private boolean isConsumersExceededOnSubscription() {
- Policies policies;
+ Policies policies = null;
+ Integer maxConsumersPerSubscription = null;
try {
- // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
- policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
- .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
- if (policies == null) {
- policies = new Policies();
+ maxConsumersPerSubscription = Optional.ofNullable(topic.getBrokerService()
+ .getTopicPolicies(TopicName.get(topic.getName())))
+ .map(TopicPolicies::getMaxConsumersPerSubscription)
+ .orElse(null);
+
+ if (maxConsumersPerSubscription == null) {
+ // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
+ policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+ .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
+ if (policies == null) {
+ policies = new Policies();
+ }
}
} catch (Exception e) {
policies = new Policies();
}
- final int maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
- policies.max_consumers_per_subscription :
- serviceConfig.getMaxConsumersPerSubscription();
+
+ if (maxConsumersPerSubscription == null) {
+ maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
+ policies.max_consumers_per_subscription :
+ serviceConfig.getMaxConsumersPerSubscription();
+ }
+
if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerList.size()) {
return true;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 2d33f55..0e6d747 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -135,21 +136,32 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
}
protected boolean isConsumersExceededOnSubscription() {
- Policies policies;
+ Policies policies = null;
+ Integer maxConsumersPerSubscription = null;
try {
- // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
- policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
- .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
-
- if (policies == null) {
- policies = new Policies();
+ maxConsumersPerSubscription = Optional.ofNullable(topic.getBrokerService()
+ .getTopicPolicies(TopicName.get(topicName)))
+ .map(TopicPolicies::getMaxConsumersPerSubscription)
+ .orElse(null);
+ if (maxConsumersPerSubscription == null) {
+ // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
+ policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
+ .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
+
+ if (policies == null) {
+ policies = new Policies();
+ }
}
} catch (Exception e) {
policies = new Policies();
}
- final int maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
- policies.max_consumers_per_subscription :
- serviceConfig.getMaxConsumersPerSubscription();
+
+ if (maxConsumersPerSubscription == null) {
+ maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 ?
+ policies.max_consumers_per_subscription :
+ serviceConfig.getMaxConsumersPerSubscription();
+ }
+
if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) {
return true;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
index 96f874a..fec0395 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
@@ -173,6 +173,33 @@ public class TopicPoliciesDisableTest extends MockedPulsarServiceBaseTest {
}
@Test
+ public void testMaxConsumersPerSubscription() throws Exception {
+ int maxConsumersPerSubscription = 10;
+ log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", maxConsumersPerSubscription, testTopic);
+
+ try {
+ admin.topics().setMaxConsumersPerSubscription(testTopic, maxConsumersPerSubscription);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+
+ try {
+ admin.topics().getMaxConsumersPerSubscription(testTopic);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+
+ try {
+ admin.topics().removeMaxConsumersPerSubscription(testTopic);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+ }
+
+ @Test
public void testPublishRateDisabled() throws Exception {
PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
log.info("Publish Rate: {} will set to the topic: {}", publishRate, testTopic);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 684a102..ef6eb40 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -589,14 +589,52 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
admin.topics().removeCompactionThreshold(testTopic);
Thread.sleep(3000);
- log.info("Compaction threshold get on topic: {} after remove", getCompactionThreshold, testTopic);
getCompactionThreshold = admin.topics().getCompactionThreshold(testTopic);
+ log.info("Compaction threshold get on topic: {} after remove", getCompactionThreshold, testTopic);
Assert.assertNull(getCompactionThreshold);
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
+ public void testGetSetMaxConsumersPerSubscription() throws Exception {
+ Integer maxConsumersPerSubscription = 10;
+ log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", maxConsumersPerSubscription, testTopic);
+
+ admin.topics().setMaxConsumersPerSubscription(testTopic, maxConsumersPerSubscription);
+ log.info("MaxConsumersPerSubscription set success on topic: {}", testTopic);
+
+ Thread.sleep(3000);
+ Integer getMaxConsumersPerSubscription = admin.topics().getMaxConsumersPerSubscription(testTopic);
+ log.info("MaxConsumersPerSubscription: {} get on topic: {}", getMaxConsumersPerSubscription, testTopic);
+ Assert.assertEquals(getMaxConsumersPerSubscription, maxConsumersPerSubscription);
+
+ admin.topics().deletePartitionedTopic(testTopic, true);
+ }
+
+ @Test
+ public void testRemoveMaxConsumersPerSubscription() throws Exception {
+ Integer maxConsumersPerSubscription = 10;
+ log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", maxConsumersPerSubscription, testTopic);
+
+ admin.topics().setMaxConsumersPerSubscription(testTopic, maxConsumersPerSubscription);
+ log.info("MaxConsumersPerSubscription set success on topic: {}", testTopic);
+
+ Thread.sleep(3000);
+ Integer getMaxConsumersPerSubscription = admin.topics().getMaxConsumersPerSubscription(testTopic);
+ log.info("MaxConsumersPerSubscription: {} get on topic: {}", getMaxConsumersPerSubscription, testTopic);
+ Assert.assertEquals(getMaxConsumersPerSubscription, maxConsumersPerSubscription);
+
+ admin.topics().removeMaxConsumersPerSubscription(testTopic);
+ Thread.sleep(3000);
+ getMaxConsumersPerSubscription = admin.topics().getMaxConsumersPerSubscription(testTopic);
+ log.info("MaxConsumersPerSubscription get on topic: {} after remove", getMaxConsumersPerSubscription, testTopic);
+ Assert.assertNull(getMaxConsumersPerSubscription);
+
+ admin.topics().deletePartitionedTopic(testTopic, true);
+ }
+
+ @Test
public void testGetSetPublishRate() throws Exception {
PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
log.info("Publish Rate: {} will set to the topic: {}", publishRate, testTopic);
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index c27baf6..282a3a4 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -2159,7 +2159,95 @@ public interface Topics {
*/
CompletableFuture<Void> removePublishRateAsync(String topic) throws PulsarAdminException;
+ /**
+ * Get the maxConsumersPerSubscription for a topic.
+ * <p/>
+ * Response example:
+ *
+ * <pre>
+ * <code>0</code>
+ * </pre>
+ *
+ * @param topic
+ * Topic name
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ Integer getMaxConsumersPerSubscription(String topic) throws PulsarAdminException;
+
+ /**
+ * Get the maxConsumersPerSubscription for a topic asynchronously.
+ * <p/>
+ * Response example:
+ *
+ * <pre>
+ * <code>0</code>
+ * </pre>
+ *
+ * @param topic
+ * Topic name
+ */
+ CompletableFuture<Integer> getMaxConsumersPerSubscriptionAsync(String topic);
+
+ /**
+ * Set maxConsumersPerSubscription for a topic.
+ * <p/>
+ * Request example:
+ *
+ * <pre>
+ * <code>10</code>
+ * </pre>
+ *
+ * @param topic
+ * Topic name
+ * @param maxConsumersPerSubscription
+ * maxConsumersPerSubscription value for a namespace
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setMaxConsumersPerSubscription(String topic, int maxConsumersPerSubscription) throws PulsarAdminException;
+
+ /**
+ * Set maxConsumersPerSubscription for a topic asynchronously.
+ * <p/>
+ * Request example:
+ *
+ * <pre>
+ * <code>10</code>
+ * </pre>
+ *
+ * @param topic
+ * Topic name
+ * @param maxConsumersPerSubscription
+ * maxConsumersPerSubscription value for a namespace
+ */
+ CompletableFuture<Void> setMaxConsumersPerSubscriptionAsync(String topic, int maxConsumersPerSubscription);
+ /**
+ * Remove the maxConsumersPerSubscription for a topic.
+ * @param topic
+ * Topic name
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void removeMaxConsumersPerSubscription(String topic) throws PulsarAdminException;
+
+ /**
+ * Remove the maxConsumersPerSubscription for a topic asynchronously.
+ * @param topic
+ * Topic name
+ */
+ CompletableFuture<Void> removeMaxConsumersPerSubscriptionAsync(String topic);
/**
* Get the max number of producer for specified topic.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 7e7233f..ca4e786 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -2316,6 +2316,84 @@ public class TopicsImpl extends BaseResource implements Topics {
}
@Override
+ public Integer getMaxConsumersPerSubscription(String topic) throws PulsarAdminException {
+ try {
+ return getMaxConsumersPerSubscriptionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Integer> getMaxConsumersPerSubscriptionAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "maxConsumersPerSubscription");
+ final CompletableFuture<Integer> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Integer>() {
+ @Override
+ public void completed(Integer maxConsumersPerSubscription) {
+ future.complete(maxConsumersPerSubscription);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void setMaxConsumersPerSubscription(String topic, int maxConsumersPerSubscription)
+ throws PulsarAdminException {
+ try {
+ setMaxConsumersPerSubscriptionAsync(topic, maxConsumersPerSubscription)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setMaxConsumersPerSubscriptionAsync(String topic, int maxConsumersPerSubscription) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "maxConsumersPerSubscription");
+ return asyncPostRequest(path, Entity.entity(maxConsumersPerSubscription, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void removeMaxConsumersPerSubscription(String topic) throws PulsarAdminException {
+ try {
+ removeMaxConsumersPerSubscriptionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeMaxConsumersPerSubscriptionAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "maxConsumersPerSubscription");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public Integer getMaxProducers(String topic) throws PulsarAdminException {
try {
return getMaxProducersAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 01e0394..54bf2e7 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -147,9 +147,15 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("get-publish-rate", new GetPublishRate());
jcommander.addCommand("set-publish-rate", new SetPublishRate());
jcommander.addCommand("remove-publish-rate", new RemovePublishRate());
- jcommander.addCommand("get-max-producers", new GetMaxProducers());
- jcommander.addCommand("set-max-producers", new SetMaxProducers());
- jcommander.addCommand("remove-max-producers", new RemoveMaxProducers());
+
+ jcommander.addCommand("get-maxProducers", new GetMaxProducers());
+ jcommander.addCommand("set-maxProducers", new SetMaxProducers());
+ jcommander.addCommand("remove-maxProducers", new RemoveMaxProducers());
+
+ jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription());
+ jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription());
+ jcommander.addCommand("remove-max-consumers-per-subscription", new RemoveMaxConsumersPerSubscription());
+
jcommander.addCommand("get-inactive-topic-policies", new GetInactiveTopicPolicies());
jcommander.addCommand("set-inactive-topic-policies", new SetInactiveTopicPolicies());
jcommander.addCommand("remove-inactive-topic-policies", new RemoveInactiveTopicPolicies());
@@ -1505,6 +1511,45 @@ public class CmdTopics extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get max consumers per subscription for a topic")
+ private class GetMaxConsumersPerSubscription extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(admin.topics().getMaxConsumersPerSubscription(persistentTopic));
+ }
+ }
+
+ @Parameters(commandDescription = "Set max consumers per subscription for a topic")
+ private class SetMaxConsumersPerSubscription extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--max-consumers-per-subscription", "-c" }, description = "maxConsumersPerSubscription for a namespace", required = true)
+ private int maxConsumersPerSubscription;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().setMaxConsumersPerSubscription(persistentTopic, maxConsumersPerSubscription);
+ }
+ }
+
+ @Parameters(commandDescription = "Remove max consumers per subscription for a topic")
+ private class RemoveMaxConsumersPerSubscription extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().removeMaxConsumersPerSubscription(persistentTopic);
+ }
+ }
+
@Parameters(commandDescription = "Get the inactive topic policies on a topic")
private class GetInactiveTopicPolicies extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)