You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2021/01/14 09:47:40 UTC
[pulsar] branch master updated: Support dispatch rate policy at the
topic level (#9175)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7b65fab Support dispatch rate policy at the topic level (#9175)
7b65fab is described below
commit 7b65fab167ab776d78fbfd7482ca59fa2276f495
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu Jan 14 17:47:07 2021 +0800
Support dispatch rate policy at the topic level (#9175)
Fixes #9143
### Motivation
The dispatch rate police is supported at the namespace level
But does not support at the topic level since we supported topic level policy at 2.7.0
### Modifications
add API
### Verifying this change
ReplicatorRateLimiterTest#testReplicatorRatePriority
TopicPoliciesTest#testReplicatorRateApi
---
.../broker/admin/impl/PersistentTopicsBase.java | 12 ++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 84 ++++++++++++++++++++++
.../service/persistent/DispatchRateLimiter.java | 12 +++-
.../broker/service/persistent/PersistentTopic.java | 8 ++-
.../pulsar/broker/admin/TopicPoliciesTest.java | 21 ++++++
.../broker/service/ReplicatorRateLimiterTest.java | 74 ++++++++++++++++++-
.../org/apache/pulsar/client/admin/Topics.java | 64 +++++++++++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 76 ++++++++++++++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 10 +++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 57 +++++++++++++++
.../pulsar/common/policies/data/TopicPolicies.java | 5 ++
11 files changed, 418 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 8503459..54b98b6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2725,6 +2725,18 @@ public class PersistentTopicsBase extends AdminResource {
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}
+ protected Optional<DispatchRate> internalGetReplicatorDispatchRate() {
+ preValidation();
+ return getTopicPolicies(topicName).map(TopicPolicies::getReplicatorDispatchRate);
+ }
+
+ protected CompletableFuture<Void> internalSetReplicatorDispatchRate(DispatchRate dispatchRate) {
+ preValidation();
+ TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
+ topicPolicies.setReplicatorDispatchRate(dispatchRate);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ }
+
private void preValidation() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 888a478..6217371 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1903,6 +1903,90 @@ public class PersistentTopics extends PersistentTopicsBase {
}
@GET
+ @Path("/{tenant}/{namespace}/{topic}/replicatorDispatchRate")
+ @ApiOperation(value = "Get replicatorDispatchRate config for specified topic.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405,
+ message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ try {
+ Optional<DispatchRate> dispatchRate = internalGetReplicatorDispatchRate();
+ if (dispatchRate.isPresent()) {
+ asyncResponse.resume(dispatchRate.get());
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ }
+ } catch (RestException e) {
+ asyncResponse.resume(e);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/replicatorDispatchRate")
+ @ApiOperation(value = "Set replicatorDispatchRate config for specified topic.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405,
+ message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification"),
+ @ApiResponse(code = 412, message = "Invalid value of replicatorDispatchRate")})
+ public void setReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Replicator dispatch rate of the topic")
+ DispatchRate dispatchRate) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalSetReplicatorDispatchRate(dispatchRate).whenComplete((r, ex) -> {
+ if (ex instanceof RestException) {
+ log.error("Updating replicatorDispatchRate failed", ex);
+ asyncResponse.resume(ex);
+ } else if (ex != null) {
+ log.error("Updating replicatorDispatchRate failed", ex);
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ log.info("[{}] Successfully updated replicatorDispatchRate: namespace={}, topic={}"
+ + ", replicatorDispatchRate={}"
+ , clientAppId(), namespaceName, topicName.getLocalName(), dispatchRate);
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/replicatorDispatchRate")
+ @ApiOperation(value = "Remove replicatorDispatchRate config for specified topic.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405,
+ message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void removeReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalSetReplicatorDispatchRate(null).whenComplete((r, ex) -> {
+ if (ex != null) {
+ log.error("Failed to remove replicatorDispatchRate", ex);
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ log.info("[{}] Successfully remove replicatorDispatchRate limit: namespace={}, topic={}",
+ clientAppId(), namespaceName, topicName.getLocalName());
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
+ @GET
@Path("/{tenant}/{namespace}/{topic}/maxProducers")
@ApiOperation(value = "Get maxProducers config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 7fb4d43..285d587 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -143,8 +143,9 @@ public class DispatchRateLimiter {
}
/**
- * Update dispatch-throttling-rate. gives first priority to namespace-policy configured dispatch rate else applies
- * default broker dispatch-throttling-rate
+ * Update dispatch-throttling-rate.
+ * Topic-level has the highest priority, then namespace-level, and finally use dispatch-throttling-rate in
+ * broker-level
*/
public void updateDispatchRate() {
Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
@@ -189,6 +190,13 @@ public class DispatchRateLimiter {
.getTopicPolicies(TopicName.get(topicName)))
.map(TopicPolicies::getSubscriptionDispatchRate);
break;
+ case REPLICATOR:
+ dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
+ .getTopicPolicies(TopicName.get(topicName)))
+ .map(TopicPolicies::getReplicatorDispatchRate);
+ break;
+ default:
+ break;
}
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.debug("Topic {} policies cache have not init.", topicName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9f8a447..0e0cc8e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -497,11 +497,13 @@ public class PersistentTopic extends AbstractTopic
});
}
+ @Override
protected CompletableFuture<Long> incrementTopicEpoch(Optional<Long> currentEpoch) {
long newEpoch = currentEpoch.orElse(-1L) + 1;
return setTopicEpoch(newEpoch);
}
+ @Override
protected CompletableFuture<Long> setTopicEpoch(long newEpoch) {
CompletableFuture<Long> future = new CompletableFuture<>();
ledger.asyncSetProperty(TOPIC_EPOCH_PROPERTY_NAME, String.valueOf(newEpoch), new UpdatePropertiesCallback() {
@@ -2059,7 +2061,7 @@ public class PersistentTopic extends AbstractTopic
}
});
replicators.forEach((name, replicator) ->
- replicator.getRateLimiter().ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data))
+ replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate)
);
checkMessageExpiry();
CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
@@ -2662,10 +2664,12 @@ public class PersistentTopic extends AbstractTopic
}
initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
- if (this.subscribeRateLimiter.isPresent() && policies != null) {
+ if (this.subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
}
+ replicators.forEach((name, replicator) -> replicator.getRateLimiter()
+ .ifPresent(DispatchRateLimiter::updateDispatchRate));
}
private Optional<Policies> getNamespacePolicies() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index f6e9eda..e8d09d7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -1387,4 +1387,25 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
c.close();
}
}
+
+ @Test(timeOut = 30000)
+ public void testReplicatorRateApi() throws Exception {
+ final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
+ // init cache
+ pulsarClient.newProducer().topic(topic).create().close();
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+
+ assertNull(admin.topics().getReplicatorDispatchRate(topic));
+
+ DispatchRate dispatchRate = new DispatchRate(100,200L,10);
+ admin.topics().setReplicatorDispatchRate(topic, dispatchRate);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
+ -> assertEquals(admin.topics().getReplicatorDispatchRate(topic), dispatchRate));
+
+ admin.topics().removeReplicatorDispatchRate(topic);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
+ -> assertNull(admin.topics().getReplicatorDispatchRate(topic)));
+ }
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
index 4c04ccf..9e68cc5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
@@ -31,7 +31,9 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -41,6 +43,10 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
/**
* Starts 3 brokers that are in 3 different clusters
*/
@@ -74,6 +80,73 @@ public class ReplicatorRateLimiterTest extends ReplicatorTestBase {
return new Object[][] { { DispatchRateType.messageRate }, { DispatchRateType.byteRate } };
}
+ @Test
+ public void testReplicatorRatePriority() throws Exception {
+ shutdown();
+ config1.setSystemTopicEnabled(true);
+ config1.setTopicLevelPoliciesEnabled(true);
+ config1.setDispatchThrottlingRatePerReplicatorInMsg(100);
+ config1.setDispatchThrottlingRatePerReplicatorInByte(200L);
+ setup();
+
+ final String namespace = "pulsar/replicatorchange-" + System.currentTimeMillis();
+ final String topicName = "persistent://" + namespace + "/ratechange";
+
+ admin1.namespaces().createNamespace(namespace);
+ // set 2 clusters, there will be 1 replicator in each topic
+ admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+ @Cleanup
+ PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS).build();
+ client1.newProducer().topic(topicName).create().close();
+ PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
+ Awaitility.await().atMost(3, TimeUnit.SECONDS)
+ .until(() -> pulsar1.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName)));
+
+ //use broker-level by default
+ assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 100);
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 200L);
+
+ //set namespace-level policy, which should take effect
+ DispatchRate nsDispatchRate = new DispatchRate(50, 60L, 70);
+ admin1.namespaces().setReplicatorDispatchRate(namespace, nsDispatchRate);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), nsDispatchRate));
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 50);
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 60L);
+
+ //set topic-level policy, which should take effect
+ DispatchRate topicRate = new DispatchRate(10, 20L, 30);
+ admin1.topics().setReplicatorDispatchRate(topicName, topicRate);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
+ assertEquals(admin1.topics().getReplicatorDispatchRate(topicName), topicRate));
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10);
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L);
+
+ //Set the namespace-level policy, which should not take effect
+ DispatchRate nsDispatchRate2 = new DispatchRate(500, 600L, 700);
+ admin1.namespaces().setReplicatorDispatchRate(namespace, nsDispatchRate2);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), nsDispatchRate2));
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L);
+
+ //remove topic-level policy, namespace-level should take effect
+ admin1.topics().removeReplicatorDispatchRate(topicName);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
+ assertNull(admin1.topics().getReplicatorDispatchRate(topicName)));
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 500);
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
+ 600L);
+
+ //remove namespace-level policy, broker-level should take effect
+ admin1.namespaces().setReplicatorDispatchRate(namespace, null);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 100));
+ assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
+ 200L);
+ }
+
/**
* verifies dispatch rate for replicators get changed once namespace policies changed.
*
@@ -103,7 +176,6 @@ public class ReplicatorRateLimiterTest extends ReplicatorTestBase {
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.close();
-
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
// 1. default replicator throttling not configured
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 3c68986..3f275d4 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -2162,6 +2162,70 @@ public interface Topics {
CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String topic);
/**
+ * Set replicatorDispatchRate for the topic.
+ * <p/>
+ * Replicator dispatch rate under this topic can dispatch this many messages per second
+ *
+ * @param topic
+ * @param dispatchRate
+ * number of messages per second
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setReplicatorDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException;
+
+ /**
+ * Set replicatorDispatchRate for the topic asynchronously.
+ * <p/>
+ * Replicator dispatch rate under this topic can dispatch this many messages per second.
+ *
+ * @param topic
+ * @param dispatchRate
+ * number of messages per second
+ */
+ CompletableFuture<Void> setReplicatorDispatchRateAsync(String topic, DispatchRate dispatchRate);
+
+ /**
+ * Get replicatorDispatchRate for the topic.
+ * <p/>
+ * Replicator dispatch rate under this topic can dispatch this many messages per second.
+ *
+ * @param topic
+ * @returns DispatchRate
+ * number of messages per second
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ DispatchRate getReplicatorDispatchRate(String topic) throws PulsarAdminException;
+
+ /**
+ * Get replicatorDispatchRate asynchronously.
+ * <p/>
+ * Replicator dispatch rate under this topic can dispatch this many messages per second.
+ *
+ * @param topic
+ * @returns DispatchRate
+ * number of messages per second
+ */
+ CompletableFuture<DispatchRate> getReplicatorDispatchRateAsync(String topic);
+
+ /**
+ * Remove replicatorDispatchRate for a topic.
+ * @param topic
+ * Topic name
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void removeReplicatorDispatchRate(String topic) throws PulsarAdminException;
+
+ /**
+ * Remove replicatorDispatchRate for a topic asynchronously.
+ * @param topic
+ * Topic name
+ */
+ CompletableFuture<Void> removeReplicatorDispatchRateAsync(String topic);
+
+ /**
* Get the compactionThreshold for a topic. The maximum number of bytes
* can have before compaction is triggered. 0 disables.
* <p/>
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 617ed32..eda3443 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -2939,6 +2939,82 @@ public class TopicsImpl extends BaseResource implements Topics {
}
@Override
+ public DispatchRate getReplicatorDispatchRate(String topic) throws PulsarAdminException {
+ try {
+ return getReplicatorDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<DispatchRate> getReplicatorDispatchRateAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "replicatorDispatchRate");
+ final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<DispatchRate>() {
+ @Override
+ public void completed(DispatchRate dispatchRate) {
+ future.complete(dispatchRate);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void setReplicatorDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException {
+ try {
+ setReplicatorDispatchRateAsync(topic, dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setReplicatorDispatchRateAsync(String topic, DispatchRate dispatchRate) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "replicatorDispatchRate");
+ return asyncPostRequest(path, Entity.entity(dispatchRate, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void removeReplicatorDispatchRate(String topic) throws PulsarAdminException {
+ try {
+ removeReplicatorDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeReplicatorDispatchRateAsync(String topic) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "replicatorDispatchRate");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public SubscribeRate getSubscribeRate(String topic) throws PulsarAdminException {
try {
return getSubscribeRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 210d9d7..477f032 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -770,6 +770,16 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("disable-deduplication persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).enableDeduplication("persistent://myprop/clust/ns1/ds1", false);
+ cmdTopics.run(split("get-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopics).getReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1");
+
+ cmdTopics.run(split("set-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1 -md 10 -bd 11 -dt 12"));
+ verify(mockTopics).setReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1",
+ new DispatchRate(10,11,12));
+
+ cmdTopics.run(split("remove-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopics).removeReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1");
+
cmdTopics.run(split("get-deduplication-enabled persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getDeduplicationEnabled("persistent://myprop/clust/ns1/ds1");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 8488e17..fff4a59 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -146,6 +146,10 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate());
jcommander.addCommand("remove-subscription-dispatch-rate", new RemoveSubscriptionDispatchRate());
+ jcommander.addCommand("get-replicator-dispatch-rate", new GetReplicatorDispatchRate());
+ jcommander.addCommand("set-replicator-dispatch-rate", new SetReplicatorDispatchRate());
+ jcommander.addCommand("remove-replicator-dispatch-rate", new RemoveReplicatorDispatchRate());
+
jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold());
jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold());
jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold());
@@ -1682,6 +1686,59 @@ public class CmdTopics extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get replicator message-dispatch-rate for a topic")
+ private class GetReplicatorDispatchRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String topic = validatePersistentTopic(params);
+ print(admin.topics().getReplicatorDispatchRate(topic));
+ }
+ }
+
+ @Parameters(commandDescription = "Set replicator message-dispatch-rate for a topic")
+ private class SetReplicatorDispatchRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--msg-dispatch-rate",
+ "-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+ private int msgDispatchRate = -1;
+
+ @Parameter(names = { "--byte-dispatch-rate",
+ "-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+ private long byteDispatchRate = -1;
+
+ @Parameter(names = { "--dispatch-rate-period",
+ "-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false)
+ private int dispatchRatePeriodSec = 1;
+
+ @Parameter(names = { "--relative-to-publish-rate",
+ "-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
+ private boolean relativeToPublishRate = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().setReplicatorDispatchRate(persistentTopic,
+ new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove replicator message-dispatch-rate for a topic")
+ private class RemoveReplicatorDispatchRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().removeReplicatorDispatchRate(persistentTopic);
+ }
+ }
+
@Parameters(commandDescription = "Get max number of producers for a topic")
private class GetMaxProducers extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index a6c7ebf..3c0c78e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -57,6 +57,11 @@ public class TopicPolicies {
private Integer deduplicationSnapshotIntervalSeconds = null;
private Integer maxMessageSize = null;
private Integer maxSubscriptionsPerTopic = null;
+ private DispatchRate replicatorDispatchRate = null;
+
+ public boolean isReplicatorDispatchRateSet() {
+ return replicatorDispatchRate != null;
+ }
public boolean isMaxSubscriptionsPerTopicSet() {
return maxSubscriptionsPerTopic != null;