You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/09/21 23:47:35 UTC
[pulsar] branch master updated: support subscription dispatch rate
on topic level (#8087)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4c508c5 support subscription dispatch rate on topic level (#8087)
4c508c5 is described below
commit 4c508c586fb9dcd08d7e13dfe4aeca19a790f51c
Author: hangc0276 <ha...@163.com>
AuthorDate: Tue Sep 22 07:47:21 2020 +0800
support subscription dispatch rate on topic level (#8087)
### Modifications
Support set subscription dispatch rate on topic level.
Support get subscription dispatch rate on topic level.
Support remove subscription dispatch rate on topic level.
---
.../broker/admin/impl/PersistentTopicsBase.java | 42 +++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 87 ++++++++++++++
.../service/persistent/DispatchRateLimiter.java | 34 +++---
.../broker/service/persistent/PersistentTopic.java | 16 ++-
.../broker/admin/TopicPoliciesDisableTest.java | 21 ++++
.../pulsar/broker/admin/TopicPoliciesTest.java | 128 +++++++++++++++++++++
.../org/apache/pulsar/client/admin/Topics.java | 64 +++++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 76 ++++++++++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 59 ++++++++++
.../pulsar/common/policies/data/TopicPolicies.java | 5 +
10 files changed, 518 insertions(+), 14 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 749c493..13c592e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3261,6 +3261,48 @@ public class PersistentTopicsBase extends AdminResource {
}
+ protected Optional<DispatchRate> internalGetSubscriptionDispatchRate() {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ return getTopicPolicies(topicName).map(TopicPolicies::getSubscriptionDispatchRate);
+ }
+
+ protected CompletableFuture<Void> internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ if (dispatchRate == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ TopicPolicies topicPolicies = getTopicPolicies(topicName)
+ .orElseGet(TopicPolicies::new);
+ topicPolicies.setSubscriptionDispatchRate(dispatchRate);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ }
+
+ protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate() {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
+ if (!topicPolicies.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ topicPolicies.get().setSubscriptionDispatchRate(null);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+ }
+
+
protected Optional<Integer> internalGetMaxConsumersPerSubscription() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 5e93c91..6762538 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2088,6 +2088,93 @@ public class PersistentTopics extends PersistentTopicsBase {
}
@GET
+ @Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
+ @ApiOperation(value = "Get subscription message dispatch rate configuration for specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void getSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ try {
+ Optional<DispatchRate> dispatchRate = internalGetSubscriptionDispatchRate();
+ if (!dispatchRate.isPresent()) {
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ asyncResponse.resume(dispatchRate.get());
+ }
+ } catch (RestException e) {
+ asyncResponse.resume(e);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
+ @ApiOperation(value = "Set subscription message dispatch rate configuration for specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void setSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Subscription message dispatch rate for the specified topic") DispatchRate dispatchRate) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalSetSubscriptionDispatchRate(dispatchRate).whenComplete((r, ex) -> {
+ if (ex instanceof RestException) {
+ log.error("Failed to set topic: {} subscription dispatch rate", topicName.getLocalName(), ex);
+ asyncResponse.resume(ex);
+ } else if (ex != null) {
+ log.error("Failed to set topic: {} subscription dispatch rate", topicName.getLocalName());
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ try {
+ log.info("[{}] Successfully set topic subscription dispatch rate: tenant={}, namespace={}, topic={}, dispatchRate={}",
+ clientAppId(),
+ tenant,
+ namespace,
+ topicName.getLocalName(),
+ jsonMapper().writeValueAsString(dispatchRate));
+ } catch (JsonProcessingException ignore) {}
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
+ @ApiOperation(value = "Remove subscription message dispatch rate configuration for specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void removeSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalRemoveSubscriptionDispatchRate().whenComplete((r, ex) -> {
+ if (ex != null) {
+ log.error("Failed to remove topic: {} subscription dispatch rate", topicName.getLocalName(), ex);
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ log.info("[{}] Successfully remove topic subscription dispatch rate: tenant={}, namespace={}, topic={}",
+ clientAppId(),
+ tenant,
+ namespace,
+ topicName.getLocalName());
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
+ @GET
@Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
@ApiOperation(value = "Get compaction threshold configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 4cc711f..29d1107 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -149,7 +149,7 @@ public class DispatchRateLimiter {
* default broker dispatch-throttling-rate
*/
public void updateDispatchRate() {
- Optional<DispatchRate> dispatchRate = getSystemTopicDispatchRate(brokerService, topicName);
+ Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
if (!dispatchRate.isPresent()) {
dispatchRate =Optional.ofNullable(getPoliciesDispatchRate(brokerService));
@@ -165,29 +165,37 @@ public class DispatchRateLimiter {
public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies,
String topicName, Type type) {
final ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration();
- if (serviceConfig.isTopicLevelPoliciesEnabled() && type == Type.TOPIC) {
- Optional<DispatchRate> dispatchRate = getSystemTopicDispatchRate(brokerService, topicName);
- if (dispatchRate.isPresent()) {
- return true;
- }
+ Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
+ if (dispatchRate.isPresent()) {
+ return true;
}
policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName);
return isDispatchRateNeeded(serviceConfig, policies, topicName, type);
}
- public static Optional<DispatchRate> getSystemTopicDispatchRate(BrokerService brokerService, String topicName) {
+ public static Optional<DispatchRate> getTopicPolicyDispatchRate(BrokerService brokerService,
+ String topicName, Type type) {
Optional<DispatchRate> dispatchRate = Optional.empty();
final ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration();
- if (serviceConfiguration.isTopicLevelPoliciesEnabled()) {
+ if (serviceConfiguration.isSystemTopicEnabled() && serviceConfiguration.isTopicLevelPoliciesEnabled()) {
try {
- dispatchRate = Optional.ofNullable(brokerService.pulsar()
- .getTopicPoliciesService().getTopicPolicies(TopicName.get(topicName)))
- .map(TopicPolicies::getDispatchRate);
- } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e){
+ switch (type) {
+ case TOPIC:
+ dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
+ .getTopicPolicies(TopicName.get(topicName)))
+ .map(TopicPolicies::getDispatchRate);
+ break;
+ case SUBSCRIPTION:
+ dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
+ .getTopicPolicies(TopicName.get(topicName)))
+ .map(TopicPolicies::getSubscriptionDispatchRate);
+ break;
+ }
+ } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.debug("Topic {} policies cache have not init.", topicName);
} catch (Exception e) {
- log.debug("[{}] Failed to get topic policies. Exception: {}", topicName, e);
+ log.debug("[{}] Failed to get topic dispatch rate. ", topicName, e);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f83677e..91c8b7d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1873,7 +1873,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
subscriptions.forEach((subName, sub) -> {
sub.getConsumers().forEach(Consumer::checkPermissions);
Dispatcher dispatcher = sub.getDispatcher();
- if (dispatcher != null) {
+ // If the topic-level policy already exists, the namespace-level policy cannot override
+ // the topic-level policy.
+ if (dispatcher != null && (topicPolicies == null || !topicPolicies.isSubscriptionDispatchRateSet())) {
dispatcher.getRateLimiter().ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data));
}
});
@@ -2389,6 +2391,18 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
});
+ subscriptions.forEach((subName, sub) -> {
+ sub.getConsumers().forEach(Consumer::checkPermissions);
+ Dispatcher dispatcher = sub.getDispatcher();
+ if (policies.isSubscriptionDispatchRateSet()) {
+ dispatcher.getRateLimiter().ifPresent(rateLimiter ->
+ rateLimiter.updateDispatchRate(policies.getSubscriptionDispatchRate()));
+ } else {
+ dispatcher.getRateLimiter().ifPresent(rateLimiter ->
+ rateLimiter.updateDispatchRate());
+ }
+ });
+
if (policies.getPublishRate() != null) {
topicPolicyPublishRate = policies.getPublishRate();
updateTopicPublishDispatcher();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
index f0f8d79..2309107 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
@@ -154,6 +154,27 @@ public class TopicPoliciesDisableTest extends MockedPulsarServiceBaseTest {
}
@Test
+ public void testSubscriptionDispatchRateDisabled() throws Exception {
+ DispatchRate dispatchRate = new DispatchRate(1000,
+ 1020*1024, 1);
+ log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, testTopic);
+
+ try {
+ admin.topics().setSubscriptionDispatchRate(testTopic, dispatchRate);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+
+ try {
+ admin.topics().getSubscriptionDispatchRate(testTopic);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+ }
+
+ @Test
public void testCompactionThresholdDisabled() {
Long compactionThreshold = 10000L;
log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index c0e7655..803cc36 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -561,6 +561,134 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
}
@Test
+ public void testGetSetSubscriptionDispatchRate() throws Exception {
+ final String topic = testTopic + UUID.randomUUID();
+ admin.topics().createNonPartitionedTopic(topic);
+ Producer producer = pulsarClient.newProducer().topic(topic).create();
+ producer.close();
+ Thread.sleep(3000);
+
+ DispatchRate dispatchRate = new DispatchRate(1000,
+ 1024 * 1024, 1);
+ log.info("Subscription Dispatch Rate: {} will set to the topic: {}", dispatchRate, topic);
+
+ admin.topics().setSubscriptionDispatchRate(topic, dispatchRate);
+ log.info("Subscription dispatch rate set success on topic: {}", topic);
+
+ Thread.sleep(3000);
+
+ String subscriptionName = "test_subscription_rate";
+ Consumer consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();
+ Thread.sleep(3000);
+
+ DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
+ .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
+ Assert.assertNotNull(dispatchRateLimiter);
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.dispatchThrottlingRateInByte);
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.dispatchThrottlingRateInMsg);
+
+
+ DispatchRate getDispatchRate = admin.topics().getSubscriptionDispatchRate(topic);
+ log.info("Subscription dispatch rate: {} get on topic: {}", getDispatchRate, topic);
+ Assert.assertEquals(getDispatchRate, dispatchRate);
+
+ producer.close();
+ admin.topics().delete(topic, true);
+ }
+
+ @Test
+ public void testRemoveSubscriptionDispatchRate() throws Exception {
+ final String topic = testTopic + UUID.randomUUID();
+ admin.topics().createNonPartitionedTopic(topic);
+ Producer producer = pulsarClient.newProducer().topic(topic).create();
+ producer.close();
+ Thread.sleep(3000);
+
+ DispatchRate dispatchRate = new DispatchRate(1000,
+ 1024 * 1024, 1);
+ log.info("Subscription Dispatch Rate: {} will set to the topic: {}", dispatchRate, topic);
+
+ admin.topics().setSubscriptionDispatchRate(topic, dispatchRate);
+ log.info("Subscription dispatch rate set success on topic: {}", topic);
+
+ Thread.sleep(3000);
+
+ String subscriptionName = "test_subscription_rate";
+ Consumer consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();
+ Thread.sleep(3000);
+
+ DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
+ .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
+ Assert.assertNotNull(dispatchRateLimiter);
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.dispatchThrottlingRateInByte);
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.dispatchThrottlingRateInMsg);
+
+ DispatchRate getDispatchRate = admin.topics().getSubscriptionDispatchRate(topic);
+ log.info("Subscription dispatch rate: {} get on topic: {}", getDispatchRate, topic);
+
+ // remove subscription dispatch rate
+ admin.topics().removeSubscriptionDispatchRate(topic);
+ Thread.sleep(3000);
+ getDispatchRate = admin.topics().getSubscriptionDispatchRate(topic);
+ log.info("Subscription dispatch rate get on topic is {} after remove", getDispatchRate);
+ Assert.assertNull(getDispatchRate);
+
+ dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
+ .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
+ Assert.assertNotEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.dispatchThrottlingRateInMsg);
+ Assert.assertNotEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.dispatchThrottlingRateInByte);
+
+ producer.close();
+ admin.topics().delete(topic, true);
+ }
+
+ @Test
+ public void testSubscriptionDispatchRatePolicyOverwrittenNamespaceLevel() throws Exception {
+ final String topic = testTopic + UUID.randomUUID();
+ admin.topics().createNonPartitionedTopic(topic);
+ Producer producer = pulsarClient.newProducer().topic(topic).create();
+ producer.close();
+ Thread.sleep(3000);
+
+ // set namespace level subscription dispatch rate
+ DispatchRate namespaceDispatchRate = new DispatchRate(100, 1024 * 1024, 1);
+ admin.namespaces().setSubscriptionDispatchRate(myNamespace, namespaceDispatchRate);
+ Thread.sleep(3000);
+
+ String subscriptionName = "test_subscription_rate";
+ Consumer consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();
+
+ // get subscription dispatch Rate limiter
+ DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
+ .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), namespaceDispatchRate.dispatchThrottlingRateInMsg);
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), namespaceDispatchRate.dispatchThrottlingRateInByte);
+
+ // set topic level subscription dispatch rate
+ DispatchRate topicDispatchRate = new DispatchRate(200, 2 * 1024 * 1024, 1);
+ admin.topics().setSubscriptionDispatchRate(topic, topicDispatchRate);
+ Thread.sleep(3000);
+
+ // get subscription dispatch rate limiter
+ dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get()
+ .getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), topicDispatchRate.dispatchThrottlingRateInByte);
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), topicDispatchRate.dispatchThrottlingRateInMsg);
+
+ // remove topic level subscription dispatch rate limiter
+ admin.topics().removeSubscriptionDispatchRate(topic);
+ Thread.sleep(3000);
+
+ // get subscription dispatch rate limiter
+ dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get()
+ .getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), namespaceDispatchRate.dispatchThrottlingRateInByte);
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), namespaceDispatchRate.dispatchThrottlingRateInMsg);
+
+ admin.topics().delete(topic, true);
+ }
+
+ @Test
public void testGetSetCompactionThreshold() throws Exception {
long compactionThreshold = 100000;
log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic);
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 1994ee2..e039083 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -2008,6 +2008,70 @@ public interface Topics {
CompletableFuture<Void> removeDispatchRateAsync(String topic) throws PulsarAdminException;
/**
+ * Set subscription-message-dispatch-rate for the topic.
+ * <p/>
+ * Subscriptions under this namespace can dispatch this many messages per second
+ *
+ * @param topic
+ * @param dispatchRate
+ * number of messages per second
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setSubscriptionDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException;
+
+ /**
+ * Set subscription-message-dispatch-rate for the topic asynchronously.
+ * <p/>
+ * Subscriptions under this namespace can dispatch this many messages per second.
+ *
+ * @param topic
+ * @param dispatchRate
+ * number of messages per second
+ */
+ CompletableFuture<Void> setSubscriptionDispatchRateAsync(String topic, DispatchRate dispatchRate);
+
+ /**
+ * Get subscription-message-dispatch-rate for the topic.
+ * <p/>
+ * Subscriptions under this namespace can dispatch this many messages per second.
+ *
+ * @param topic
+ * @returns DispatchRate
+ * number of messages per second
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ DispatchRate getSubscriptionDispatchRate(String topic) throws PulsarAdminException;
+
+ /**
+ * Get subscription-message-dispatch-rate asynchronously.
+ * <p/>
+ * Subscriptions under this namespace can dispatch this many messages per second.
+ *
+ * @param topic
+ * @returns DispatchRate
+ * number of messages per second
+ */
+ CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String topic);
+
+ /**
+ * Remove subscription-message-dispatch-rate for a topic.
+ * @param topic
+ * Topic name
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void removeSubscriptionDispatchRate(String topic) throws PulsarAdminException;
+
+ /**
+ * Remove subscription-message-dispatch-rate for a topic asynchronously.
+ * @param topic
+ * Topic name
+ */
+ CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String topic);
+
+ /**
* Get the compactionThreshold for a topic. The maximum number of bytes
* can have before compaction is triggered. 0 disables.
* <p/>
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index eb30d9c..9238c07 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -2165,6 +2165,82 @@ public class TopicsImpl extends BaseResource implements Topics {
}
@Override
+ public DispatchRate getSubscriptionDispatchRate(String topic) throws PulsarAdminException {
+ try {
+ return getSubscriptionDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "subscriptionDispatchRate");
+ final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<DispatchRate>() {
+ @Override
+ public void completed(DispatchRate dispatchRate) {
+ future.complete(dispatchRate);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void setSubscriptionDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException {
+ try {
+ setSubscriptionDispatchRateAsync(topic, dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setSubscriptionDispatchRateAsync(String topic, DispatchRate dispatchRate) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "subscriptionDispatchRate");
+ return asyncPostRequest(path, Entity.entity(dispatchRate, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void removeSubscriptionDispatchRate(String topic) throws PulsarAdminException {
+ try {
+ removeSubscriptionDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "subscriptionDispatchRate");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public Long getCompactionThreshold(String topic) throws PulsarAdminException {
try {
return getCompactionThresholdAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 9649cad..1c6a131 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -133,9 +133,15 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("get-offload-policies", new GetOffloadPolicies());
jcommander.addCommand("set-offload-policies", new SetOffloadPolicies());
jcommander.addCommand("remove-offload-policies", new RemoveOffloadPolicies());
+
jcommander.addCommand("get-dispatch-rate", new GetDispatchRate());
jcommander.addCommand("set-dispatch-rate", new SetDispatchRate());
jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate());
+
+ jcommander.addCommand("get-subscription-dispatch-rate", new GetSubscriptionDispatchRate());
+ jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate());
+ jcommander.addCommand("remove-subscription-dispatch-rate", new RemoveSubscriptionDispatchRate());
+
jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold());
jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold());
jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold());
@@ -1478,6 +1484,59 @@ public class CmdTopics extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get subscription message-dispatch-rate for a topic")
+ private class GetSubscriptionDispatchRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(admin.topics().getSubscriptionDispatchRate(persistentTopic));
+ }
+ }
+
+ @Parameters(commandDescription = "Set subscription message-dispatch-rate for a topic")
+ private class SetSubscriptionDispatchRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--msg-dispatch-rate",
+ "-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+ private int msgDispatchRate = -1;
+
+ @Parameter(names = { "--byte-dispatch-rate",
+ "-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+ private long byteDispatchRate = -1;
+
+ @Parameter(names = { "--dispatch-rate-period",
+ "-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false)
+ private int dispatchRatePeriodSec = 1;
+
+ @Parameter(names = { "--relative-to-publish-rate",
+ "-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
+ private boolean relativeToPublishRate = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().setSubscriptionDispatchRate(persistentTopic,
+ new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove subscription message-dispatch-rate for a topic")
+ private class RemoveSubscriptionDispatchRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().removeSubscriptionDispatchRate(persistentTopic);
+ }
+ }
+
@Parameters(commandDescription = "Get max number of producers for a topic")
private class GetMaxProducers extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index fb785b6..0e3440b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -52,6 +52,7 @@ public class TopicPolicies {
private OffloadPolicies offloadPolicies;
private InactiveTopicPolicies inactiveTopicPolicies = null;
private DispatchRate dispatchRate = null;
+ private DispatchRate subscriptionDispatchRate = null;
private Long compactionThreshold = null;
private PublishRate publishRate = null;
private SubscribeRate subscribeRate = null;
@@ -116,6 +117,10 @@ public class TopicPolicies {
return dispatchRate != null;
}
+ public boolean isSubscriptionDispatchRateSet() {
+ return subscriptionDispatchRate != null;
+ }
+
public boolean isCompactionThresholdSet() {
return compactionThreshold != null;
}