You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/09/21 03:36:55 UTC
[pulsar] branch master updated: Support subscribe rate on topic
level (#7991)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new afe6413 Support subscribe rate on topic level (#7991)
afe6413 is described below
commit afe641350838b0a215b53ca5fbcb7990d7b45857
Author: hangc0276 <ha...@163.com>
AuthorDate: Mon Sep 21 11:34:43 2020 +0800
Support subscribe rate on topic level (#7991)
Support set subscribe rate on topic level.
Support get subscribe rate on topic level.
Support remove subscribe rate on topic level.
---
.../broker/admin/impl/PersistentTopicsBase.java | 43 +++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 88 ++++++++++
.../broker/service/persistent/PersistentTopic.java | 32 +++-
.../service/persistent/SubscribeRateLimiter.java | 76 ++++++---
.../broker/admin/TopicPoliciesDisableTest.java | 21 +++
.../pulsar/broker/admin/TopicPoliciesTest.java | 179 +++++++++++++++++++++
.../org/apache/pulsar/client/admin/Topics.java | 64 +++++++-
.../pulsar/client/admin/internal/TopicsImpl.java | 78 +++++++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 51 ++++++
.../pulsar/common/policies/data/TopicPolicies.java | 5 +
10 files changed, 614 insertions(+), 23 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9a81e9a..749c493 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin.impl;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
import static org.apache.pulsar.common.util.Codec.decode;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -3386,4 +3387,46 @@ public class PersistentTopicsBase extends AdminResource {
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
}
+ protected Optional<SubscribeRate> internalGetSubscribeRate() {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+
+ checkTopicLevelPolicyEnable();
+ return getTopicPolicies(topicName).map(TopicPolicies::getSubscribeRate);
+ }
+
+ protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate subscribeRate) {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ if (subscribeRate == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ TopicPolicies topicPolicies = getTopicPolicies(topicName)
+ .orElseGet(TopicPolicies::new);
+ topicPolicies.setSubscribeRate(subscribeRate);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ }
+
+ protected CompletableFuture<Void> internalRemoveSubscribeRate() {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
+ if (!topicPolicies.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ topicPolicies.get().setSubscribeRate(null);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+ }
+
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 68a12a5..5e93c91 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.v2;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
import static org.apache.pulsar.common.util.Codec.decode;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -2347,5 +2348,92 @@ public class PersistentTopics extends PersistentTopicsBase {
});
}
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/subscribeRate")
+ @ApiOperation(value = "Get subscribe rate configuration for specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void getSubscribeRate(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ try {
+ Optional<SubscribeRate> subscribeRate = internalGetSubscribeRate();
+ if (!subscribeRate.isPresent()) {
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ asyncResponse.resume(subscribeRate.get());
+ }
+ } catch (RestException e) {
+ asyncResponse.resume(e);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/subscribeRate")
+ @ApiOperation(value = "Set subscribe rate configuration for specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void setSubscribeRate(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalSetSubscribeRate(subscribeRate).whenComplete((r, ex) -> {
+ if (ex instanceof RestException) {
+ log.error("Failed to set topic {} subscribe rate", topicName.getLocalName(), ex);
+ asyncResponse.resume(ex);
+ } else if (ex != null) {
+ log.error("Failed to set topic subscribe rate");
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ try {
+ log.info("[{}] Successfully set topic subscribe rate: tenant={}, namespace={}, topic={}, subscribeRate={}",
+ clientAppId(),
+ tenant,
+ namespace,
+ topicName.getLocalName(),
+ jsonMapper().writeValueAsString(subscribeRate));
+ } catch (JsonProcessingException ignore) {}
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/subscribeRate")
+ @ApiOperation(value = "Remove subscribe rate configuration for specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void removeSubscribeRate(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalRemoveSubscribeRate().whenComplete((r, ex) -> {
+ if (ex != null) {
+ log.error("Failed to remove topic {} subscribe rate ", topicName.getLocalName(), ex);
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ log.info("[{}] Successfully remove topic subscribe rate: tenant={}, namespace={}, topic={}",
+ clientAppId(),
+ tenant,
+ namespace,
+ topicName.getLocalName());
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 292e070..f83677e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -230,7 +230,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
this.backloggedCursorThresholdEntries = brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
initializeDispatchRateLimiterIfNeeded(Optional.empty());
- brokerService.getPulsar().getTopicPoliciesService().registerListener(TopicName.get(topic), this);
+ registerTopicPolicyListener();
+
this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
@@ -581,6 +582,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
future.completeExceptionally(new NotAllowedException("Subscribe limited by subscribe rate limit per consumer."));
return future;
}
+
}
lock.readLock().lock();
@@ -2403,6 +2405,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
, cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled());
}
+
+ initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
+ if (this.subscribeRateLimiter.isPresent() && policies != null) {
+ subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
+ subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
+ }
}
private Optional<Policies> getNamespacePolicies(){
@@ -2423,6 +2431,15 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
}
+ private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies> policies) {
+ synchronized (subscribeRateLimiter) {
+ if (!subscribeRateLimiter.isPresent() && policies.isPresent() &&
+ policies.get().getSubscribeRate() != null) {
+ this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
+ }
+ }
+ }
+
private PersistentTopic getPersistentTopic() {
return this;
}
@@ -2454,6 +2471,19 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
}
+ private void registerTopicPolicyListener() {
+ if (brokerService.pulsar().getConfig().isSystemTopicEnabled() &&
+ brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+ TopicName topicName = TopicName.get(topic);
+ TopicName cloneTopicName = topicName;
+ if (topicName.isPartitioned()) {
+ cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
+ }
+
+ brokerService.getPulsar().getTopicPoliciesService().registerListener(cloneTopicName, this);
+ }
+ }
+
@VisibleForTesting
public MessageDeduplication getMessageDeduplication() {
return messageDeduplication;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index ee07250..30f13fc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -23,8 +23,10 @@ import com.google.common.base.MoreObjects;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +52,17 @@ public class SubscribeRateLimiter {
this.brokerService = topic.getBrokerService();
subscribeRateLimiter = new ConcurrentHashMap<>();
this.executorService = brokerService.pulsar().getExecutor();
- this.subscribeRate = getPoliciesSubscribeRate();
+ // get subscribeRate from topic level policies
+ this.subscribeRate = Optional.ofNullable(brokerService.getTopicPolicies(TopicName.get(this.topicName)))
+ .map(TopicPolicies::getSubscribeRate)
+ .orElse(null);
+
+ // subscribeRate of topic level policies not set, get from zookeeper
+ if (this.subscribeRate == null) {
+ this.subscribeRate = getPoliciesSubscribeRate();
+ }
+
+ // get subscribeRate from broker.conf
if (this.subscribeRate == null) {
this.subscribeRate = new SubscribeRate(brokerService.pulsar().getConfiguration().getSubscribeThrottlingRatePerConsumer(),
brokerService.pulsar().getConfiguration().getSubscribeRatePeriodPerConsumerInSecond());
@@ -95,9 +107,10 @@ public class SubscribeRateLimiter {
* default broker subscribe-throttling-rate
*/
private synchronized void addSubscribeLimiterIfAbsent(ConsumerIdentifier consumerIdentifier) {
- if (subscribeRateLimiter.get(consumerIdentifier) != null) {
+ if (subscribeRateLimiter.get(consumerIdentifier) != null || !isSubscribeRateEnabled(this.subscribeRate)) {
return;
}
+
updateSubscribeRate(consumerIdentifier, this.subscribeRate);
}
@@ -134,31 +147,52 @@ public class SubscribeRateLimiter {
}
public void onPoliciesUpdate(Policies data) {
+ // if subscribe rate is set on topic policy, skip subscribe rate update
+ SubscribeRate subscribeRate = Optional.ofNullable(brokerService.getTopicPolicies(TopicName.get(topicName)))
+ .map(TopicPolicies::getSubscribeRate)
+ .orElse(null);
+ if (subscribeRate != null) {
+ return;
+ }
String cluster = brokerService.pulsar().getConfiguration().getClusterName();
- SubscribeRate subscribeRate = data.clusterSubscribeRate.get(cluster);
+ subscribeRate = data.clusterSubscribeRate.get(cluster);
- // update dispatch-rate only if it's configured in policies else ignore
- if (subscribeRate != null) {
- final SubscribeRate newSubscribeRate = new SubscribeRate(
- brokerService.pulsar().getConfiguration().getSubscribeThrottlingRatePerConsumer(),
- brokerService.pulsar().getConfiguration().getSubscribeRatePeriodPerConsumerInSecond()
- );
- // if policy-throttling rate is disabled and cluster-throttling is enabled then apply
- // cluster-throttling rate
- if (!isSubscribeRateEnabled(subscribeRate) && isSubscribeRateEnabled(newSubscribeRate)) {
- subscribeRate = newSubscribeRate;
- }
- this.subscribeRate = subscribeRate;
- stopResetTask();
- for (ConsumerIdentifier consumerIdentifier : this.subscribeRateLimiter.keySet()) {
+ onSubscribeRateUpdate(subscribeRate);
+
+ }
+
+ public void onSubscribeRateUpdate(SubscribeRate subscribeRate) {
+ final SubscribeRate namespacePolicySubscribeRate = getPoliciesSubscribeRate();
+ final SubscribeRate newSubscribeRate = new SubscribeRate(
+ brokerService.pulsar().getConfiguration().getSubscribeThrottlingRatePerConsumer(),
+ brokerService.pulsar().getConfiguration().getSubscribeRatePeriodPerConsumerInSecond()
+ );
+
+ // if policy-throttling rate is disabled and cluster-throttling is enabled then apply
+ // cluster-throttling rate
+ // if topic policy-throttling rate is disabled
+ if (!isSubscribeRateEnabled(subscribeRate) && isSubscribeRateEnabled(namespacePolicySubscribeRate)) {
+ subscribeRate = namespacePolicySubscribeRate;
+ }
+
+ if (!isSubscribeRateEnabled(subscribeRate) && !isSubscribeRateEnabled(namespacePolicySubscribeRate)
+ && isSubscribeRateEnabled(newSubscribeRate)) {
+ subscribeRate = newSubscribeRate;
+ }
+ this.subscribeRate = subscribeRate;
+ stopResetTask();
+ for (ConsumerIdentifier consumerIdentifier : this.subscribeRateLimiter.keySet()) {
+ if (!isSubscribeRateEnabled(this.subscribeRate)) {
+ removeSubscribeLimiter(consumerIdentifier);
+ } else {
updateSubscribeRate(consumerIdentifier, subscribeRate);
}
- if (isSubscribeRateEnabled(this.subscribeRate)) {
- this.resetTask = createTask();
- log.info("[{}] configured subscribe-dispatch rate at broker {}", this.topicName, subscribeRate);
- }
+ }
+ if (isSubscribeRateEnabled(this.subscribeRate)) {
+ this.resetTask = createTask();
+ log.info("[{}] configured subscribe-dispatch rate at broker {}", this.topicName, subscribeRate);
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
index fec0395..f0f8d79 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -254,4 +255,24 @@ public class TopicPoliciesDisableTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(e.getStatusCode(), 405);
}
}
+
+ @Test
+ public void testSubscribeRateDisabled() throws Exception {
+ SubscribeRate subscribeRate = new SubscribeRate(10, 30);
+ log.info("Subscribe Rate: {} will set to the topic: {}", subscribeRate, testTopic);
+
+ try {
+ admin.topics().setSubscribeRate(testTopic, subscribeRate);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+
+ try {
+ admin.topics().getSubscribeRate(testTopic);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index ef6eb40..c0e7655 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.admin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
import static org.testng.Assert.assertEquals;
import com.google.common.collect.Sets;
@@ -788,4 +790,181 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
admin.topics().deletePartitionedTopic(persistenceTopic, true);
admin.topics().deletePartitionedTopic(testTopic, true);
}
+
+ @Test
+ public void testGetSetSubscribeRate() throws Exception {
+ admin.topics().createPartitionedTopic(persistenceTopic, 2);
+ Producer producer = pulsarClient.newProducer().topic(testTopic).create();
+ producer.close();
+
+ SubscribeRate subscribeRate = new SubscribeRate(1, 30);
+ log.info("Subscribe Rate: {} will be set to the namespace: {}", subscribeRate, myNamespace);
+ admin.namespaces().setSubscribeRate(myNamespace, subscribeRate);
+ log.info("Subscribe Rate set success on namespace: {}", myNamespace);
+ Thread.sleep(3000);
+
+ subscribeRate = new SubscribeRate(2, 30);
+ log.info("Subscribe Rate: {} will set to the topic: {}", subscribeRate, persistenceTopic);
+ admin.topics().setSubscribeRate(persistenceTopic, subscribeRate);
+ log.info("Subscribe Rate set success on topic: {}", persistenceTopic);
+
+ Thread.sleep(3000);
+
+ PulsarClient pulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);
+ PulsarClient pulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);
+ PulsarClient pulsarClient3 = newPulsarClient(lookupUrl.toString(), 0);
+
+ Consumer consumer1 = null;
+ Consumer consumer2 = null;
+ Consumer consumer3 = null;
+
+ try {
+ consumer1 = pulsarClient1.newConsumer().subscriptionName("sub1")
+ .topic(persistenceTopic).consumerName("test").subscribe();
+ Assert.assertNotNull(consumer1);
+ consumer1.close();
+ pulsarClient1.shutdown();
+ } catch (PulsarClientException e) {
+ Assert.fail();
+ }
+
+ try {
+ consumer2 = pulsarClient2.newConsumer().subscriptionName("sub1")
+ .topic(persistenceTopic).consumerName("test").subscribe();
+ Assert.assertNotNull(consumer2);
+ consumer2.close();
+ pulsarClient2.shutdown();
+ } catch (PulsarClientException e) {
+ Assert.fail();
+ }
+
+ try {
+ consumer3 = pulsarClient3.newConsumer().subscriptionName("sub1")
+ .topic(persistenceTopic).consumerName("test").subscribe();
+ Assert.fail();
+ } catch (PulsarClientException e) {
+ log.info("subscribe rate reached max subscribe rate limit");
+ }
+
+ Assert.assertNull(consumer3);
+ pulsarClient3.shutdown();
+
+ SubscribeRate getSubscribeRate = admin.topics().getSubscribeRate(persistenceTopic);
+ log.info("Subscribe Rate: {} get on topic: {}", getSubscribeRate, persistenceTopic);
+ Assert.assertEquals(getSubscribeRate, subscribeRate);
+
+ admin.topics().deletePartitionedTopic(testTopic, true);
+ admin.topics().deletePartitionedTopic(persistenceTopic, true);
+ }
+
+ @Test
+ public void testRemoveSubscribeRate() throws Exception {
+ admin.topics().createPartitionedTopic(persistenceTopic, 2);
+ Producer producer = pulsarClient.newProducer().topic(testTopic).create();
+ producer.close();
+
+ SubscribeRate subscribeRate = new SubscribeRate(2, 30);
+ log.info("Subscribe Rate: {} will set to the topic: {}", subscribeRate, persistenceTopic);
+ admin.topics().setSubscribeRate(persistenceTopic, subscribeRate);
+ log.info("Subscribe Rate set success on topic: {}", persistenceTopic);
+
+ Thread.sleep(3000);
+
+ PulsarClient pulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);
+ PulsarClient pulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);
+ PulsarClient pulsarClient3 = newPulsarClient(lookupUrl.toString(), 0);
+
+ Consumer consumer1 = null;
+ Consumer consumer2 = null;
+ Consumer consumer3 = null;
+
+ try {
+ consumer1 = pulsarClient1.newConsumer().subscriptionName("sub1")
+ .topic(persistenceTopic).consumerName("test").subscribe();
+ Assert.assertNotNull(consumer1);
+ consumer1.close();
+ pulsarClient1.shutdown();
+ } catch (PulsarClientException e) {
+ Assert.fail();
+ }
+
+ try {
+ consumer2 = pulsarClient2.newConsumer().subscriptionName("sub1")
+ .topic(persistenceTopic).consumerName("test").subscribe();
+ Assert.assertNotNull(consumer2);
+ consumer2.close();
+ pulsarClient2.shutdown();
+ } catch (PulsarClientException e) {
+ Assert.fail();
+ }
+
+ try {
+ consumer3 = pulsarClient3.newConsumer().subscriptionName("sub1")
+ .topic(persistenceTopic).consumerName("test").subscribe();
+ Assert.fail();
+ } catch (PulsarClientException e) {
+ log.info("subscribe rate reached max subscribe rate limit");
+ }
+ Assert.assertNull(consumer3);
+
+ SubscribeRate getSubscribeRate = admin.topics().getSubscribeRate(persistenceTopic);
+ log.info("Subscribe Rate: {} get on topic: {}", getSubscribeRate, persistenceTopic);
+ Assert.assertEquals(getSubscribeRate, subscribeRate);
+
+ admin.topics().removeSubscribeRate(persistenceTopic);
+ Thread.sleep(3000);
+ log.info("Subscribe Rate get on topic: {} after remove", getSubscribeRate, persistenceTopic);
+ getSubscribeRate = admin.topics().getSubscribeRate(persistenceTopic);
+ Assert.assertNull(getSubscribeRate);
+
+ PulsarClient pulsarClient4 = newPulsarClient(lookupUrl.toString(), 0);
+ PulsarClient pulsarClient5 = newPulsarClient(lookupUrl.toString(), 0);
+ PulsarClient pulsarClient6 = newPulsarClient(lookupUrl.toString(), 0);
+
+ Consumer consumer4 = null;
+ Consumer consumer5 = null;
+ Consumer consumer6 = null;
+
+ try {
+ consumer3 = pulsarClient3.newConsumer().subscriptionName("sub2")
+ .topic(persistenceTopic).consumerName("test").subscribe();
+ Assert.assertNotNull(consumer3);
+ consumer3.close();
+ pulsarClient3.shutdown();
+ } catch (PulsarClientException e) {
+ Assert.fail();
+ }
+
+ try {
+ consumer4 = pulsarClient4.newConsumer().subscriptionName("sub2")
+ .topic(persistenceTopic).consumerName("test").subscribe();
+ Assert.assertNotNull(consumer4);
+ consumer4.close();
+ pulsarClient4.shutdown();
+ } catch (PulsarClientException e) {
+ Assert.fail();
+ }
+ try {
+ consumer5 = pulsarClient5.newConsumer().subscriptionName("sub2")
+ .topic(persistenceTopic).consumerName("test").subscribe();
+ Assert.assertNotNull(consumer5);
+ consumer5.close();
+ pulsarClient5.shutdown();
+ } catch (PulsarClientException e) {
+ Assert.fail();
+ }
+
+ try {
+ consumer6 = pulsarClient6.newConsumer().subscriptionName("sub2")
+ .topic(persistenceTopic).consumerName("test").subscribe();
+ Assert.assertNotNull(consumer6);
+ consumer6.close();
+ pulsarClient6.shutdown();
+ } catch (PulsarClientException e) {
+ Assert.fail();
+ }
+
+ admin.topics().deletePartitionedTopic(persistenceTopic, true);
+ admin.topics().deletePartitionedTopic(testTopic, true);
+ }
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 282a3a4..1994ee2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TopicStats;
/**
* Admin interface for Topics management.
@@ -2301,7 +2302,6 @@ public interface Topics {
*/
CompletableFuture<Void> removeMaxProducersAsync(String topic);
-
/**
* Get the max number of consumer for specified topic.
*
@@ -2353,4 +2353,66 @@ public interface Topics {
* @param topic Topic name
*/
CompletableFuture<Void> removeMaxConsumersAsync(String topic);
+
+ /**
+ * Set topic-subscribe-rate (topic will limit by subscribeRate).
+ *
+ * @param topic
+ * @param subscribeRate
+ * consumer subscribe limit by this subscribeRate
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setSubscribeRate(String topic, SubscribeRate subscribeRate) throws PulsarAdminException;
+
+ /**
+ * Set topic-subscribe-rate (topics will limit by subscribeRate) asynchronously.
+ *
+ * @param topic
+ * @param subscribeRate
+ * consumer subscribe limit by this subscribeRate
+ */
+ CompletableFuture<Void> setSubscribeRateAsync(String topic, SubscribeRate subscribeRate);
+
+ /**
+ * Get topic-subscribe-rate (topics allow subscribe times per consumer in a period).
+ *
+ * @param topic
+ * @returns subscribeRate
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ SubscribeRate getSubscribeRate(String topic) throws PulsarAdminException;
+
+ /**
+ * Get topic-subscribe-rate asynchronously.
+ * <p/>
+ * Topic allow subscribe times per consumer in a period.
+ *
+ * @param topic
+ * @returns subscribeRate
+ */
+ CompletableFuture<SubscribeRate> getSubscribeRateAsync(String topic);
+
+ /**
+ * Remove topic-subscribe-rate.
+ * <p/>
+ * Remove topic subscribe rate
+ *
+ * @param topic
+ * @throws PulsarAdminException
+ * unexpected error
+ */
+ void removeSubscribeRate(String topic) throws PulsarAdminException;
+
+ /**
+ * Remove topic-subscribe-rate asynchronously.
+ * <p/>
+ * Remove topic subscribe rate
+ *
+ * @param topic
+ * @throws PulsarAdminException
+ * unexpected error
+ */
+ CompletableFuture<Void> removeSubscribeRateAsync(String topic) throws PulsarAdminException;
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index ca4e786..eb30d9c 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -79,6 +79,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Codec;
@@ -2545,5 +2546,82 @@ public class TopicsImpl extends BaseResource implements Topics {
return asyncDeleteRequest(path);
}
+
+ @Override
+ public SubscribeRate getSubscribeRate(String topic) throws PulsarAdminException {
+ try {
+ return getSubscribeRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<SubscribeRate> getSubscribeRateAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "subscribeRate");
+ final CompletableFuture<SubscribeRate> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<SubscribeRate>() {
+ @Override
+ public void completed(SubscribeRate subscribeRate) {
+ future.complete(subscribeRate);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void setSubscribeRate(String topic, SubscribeRate subscribeRate) throws PulsarAdminException {
+ try {
+ setSubscribeRateAsync(topic, subscribeRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setSubscribeRateAsync(String topic, SubscribeRate subscribeRate) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "subscribeRate");
+ return asyncPostRequest(path, Entity.entity(subscribeRate, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void removeSubscribeRate(String topic) throws PulsarAdminException {
+ try {
+ removeSubscribeRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeSubscribeRateAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "subscribeRate");
+ return asyncDeleteRequest(path);
+ }
+
private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 54bf2e7..9649cad 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -57,6 +57,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.util.RelativeTimeUtil;
@Parameters(commandDescription = "Operations on persistent topics")
@@ -159,9 +160,14 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("get-inactive-topic-policies", new GetInactiveTopicPolicies());
jcommander.addCommand("set-inactive-topic-policies", new SetInactiveTopicPolicies());
jcommander.addCommand("remove-inactive-topic-policies", new RemoveInactiveTopicPolicies());
+
jcommander.addCommand("get-max-consumers", new GetMaxConsumers());
jcommander.addCommand("set-max-consumers", new SetMaxConsumers());
jcommander.addCommand("remove-max-consumers", new RemoveMaxConsumers());
+
+ jcommander.addCommand("get-subscribe-rate", new GetSubscribeRate());
+ jcommander.addCommand("set-subscribe-rate", new SetSubscribeRate());
+ jcommander.addCommand("remove-subscribe-rate", new RemoveSubscribeRate());
}
@Parameters(commandDescription = "Get the list of topics under a namespace.")
@@ -1650,4 +1656,49 @@ public class CmdTopics extends CmdBase {
admin.topics().removeMaxConsumers(persistentTopic);
}
}
+
+ @Parameters(commandDescription = "Get consumer subscribe rate for a topic")
+ private class GetSubscribeRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(admin.topics().getSubscribeRate(persistentTopic));
+ }
+ }
+
+ @Parameters(commandDescription = "Set consumer subscribe rate for a topic")
+ private class SetSubscribeRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--subscribe-rate",
+ "-sr" }, description = "subscribe-rate (default -1 will be overwrite if not passed)\n", required = false)
+ private int subscribeRate = -1;
+
+ @Parameter(names = { "--subscribe-rate-period",
+ "-st" }, description = "subscribe-rate-period in second type (default 30 second will be overwrite if not passed)\n", required = false)
+ private int subscribeRatePeriodSec = 30;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().setSubscribeRate(persistentTopic,
+ new SubscribeRate(subscribeRate, subscribeRatePeriodSec));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove consumer subscribe rate for a topic")
+ private class RemoveSubscribeRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().removeSubscribeRate(persistentTopic);
+ }
+ }
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index 343770d..fb785b6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -54,6 +54,7 @@ public class TopicPolicies {
private DispatchRate dispatchRate = null;
private Long compactionThreshold = null;
private PublishRate publishRate = null;
+ private SubscribeRate subscribeRate = null;
public boolean isInactiveTopicPoliciesSet() {
return inactiveTopicPolicies != null;
@@ -122,4 +123,8 @@ public class TopicPolicies {
public boolean isPublishRateSet() {
return publishRate != null;
}
+
+ public boolean isSubscribeRateSet() {
+ return subscribeRate != null;
+ }
}