You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/08/28 13:33:39 UTC
[pulsar] branch master updated: support message dispatch rate on
topic level (#7863)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 395037e support message dispatch rate on topic level (#7863)
395037e is described below
commit 395037e9cfb03a3cbbc22347e95fa76dae282898
Author: hangc0276 <ha...@163.com>
AuthorDate: Fri Aug 28 21:33:11 2020 +0800
support message dispatch rate on topic level (#7863)
### Motivation
Support message dispatch rate on topic level.
Based on the system topic function.
### Modifications
Support set message dispatch rate on topic level.
Support get message dispatch rate on topic level.
Support remove message dispatch rate on topic level.
---
.../broker/admin/impl/PersistentTopicsBase.java | 44 +++++++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 89 ++++++++++++++++++++++
.../SystemTopicBasedTopicPoliciesService.java | 18 +++++
.../broker/service/TopicPoliciesService.java | 18 +++++
.../pulsar/broker/service/TopicPolicyListener.java | 23 ++++++
.../service/persistent/DispatchRateLimiter.java | 40 ++++++++--
.../broker/service/persistent/PersistentTopic.java | 36 ++++++++-
.../broker/admin/TopicPoliciesDisableTest.java | 21 +++++
.../pulsar/broker/admin/TopicPoliciesTest.java | 40 ++++++++++
.../org/apache/pulsar/client/admin/Topics.java | 68 +++++++++++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 78 +++++++++++++++++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 57 ++++++++++++++
.../pulsar/common/policies/data/TopicPolicies.java | 5 ++
13 files changed, 530 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 51c8952..0123aa7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -95,6 +95,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.PartitionedManagedLedgerInfo;
+import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
@@ -3056,4 +3057,47 @@ public class PersistentTopicsBase extends AdminResource {
});
}
+
+ protected Optional<DispatchRate> internalGetDispatchRate() {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ return getTopicPolicies(topicName).map(TopicPolicies::getDispatchRate);
+ }
+
+ protected CompletableFuture<Void> internalSetDispatchRate(DispatchRate dispatchRate) {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ if (dispatchRate == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ TopicPolicies topicPolicies = getTopicPolicies(topicName)
+ .orElseGet(TopicPolicies::new);
+ topicPolicies.setDispatchRate(dispatchRate);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ }
+
+ protected CompletableFuture<Void> internalRemoveDispatchRate() {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
+ if (!topicPolicies.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ topicPolicies.get().setDispatchRate(null);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+
+ }
+
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 521fb75..c00f270 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -30,6 +30,7 @@ import io.swagger.annotations.ApiResponses;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
@@ -55,6 +56,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
@@ -1695,5 +1697,92 @@ public class PersistentTopics extends PersistentTopicsBase {
}
}
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/dispatchRate")
+ @ApiOperation(value = "Get dispatch rate configuration for specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void getDispatchRate(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ try {
+ Optional<DispatchRate> dispatchRate = internalGetDispatchRate();
+ if (!dispatchRate.isPresent()) {
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ asyncResponse.resume(dispatchRate.get());
+ }
+ } catch (RestException e) {
+ asyncResponse.resume(e);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/dispatchRate")
+ @ApiOperation(value = "Set message dispatch rate configuration for specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void setDispatchRate(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Dispatch rate for the specified topic") DispatchRate dispatchRate) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalSetDispatchRate(dispatchRate).whenComplete((r, ex) -> {
+ if (ex instanceof RestException) {
+ log.error("Failed to set topic dispatch rate", ex);
+ asyncResponse.resume(ex);
+ } else if (ex != null) {
+ log.error("Failed to set topic dispatch rate");
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ try {
+ log.info("[{}] Successfully set topic dispatch rate: tenant={}, namespace={}, topic={}, dispatchRate={}",
+ clientAppId(),
+ tenant,
+ namespace,
+ topicName.getLocalName(),
+ jsonMapper().writeValueAsString(dispatchRate));
+ } catch (JsonProcessingException ignore) {}
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/dispatchRate")
+ @ApiOperation(value = "Remove message dispatch rate configuration for specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void removeDispatchRate(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalRemoveDispatchRate().whenComplete((r, ex) -> {
+ if (ex != null) {
+ log.error("Failed to remove topic dispatch rate", ex);
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ log.info("[{}] Successfully remove topic dispatch rate: tenant={}, namespace={}, topic={}",
+ clientAppId(),
+ tenant,
+ namespace,
+ topicName.getLocalName());
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 7c561c9..84add91 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -112,8 +113,15 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
});
})
);
+ if (listeners.get(topicName) != null) {
+ for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
+ listener.onUpdate(policies);
+ }
+ }
}
});
+
+
return result;
}
@@ -329,5 +337,15 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
return policyCacheInitMap.get(namespaceName);
}
+ @Override
+ public void registerListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
+ listeners.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).add(listener);
+ }
+
+ @Override
+ public void unregisterListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
+ listeners.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).remove(listener);
+ }
+
private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 1d9c382..9fde3a6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -24,7 +24,10 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Topic policies service
@@ -32,6 +35,7 @@ import java.util.concurrent.CompletableFuture;
public interface TopicPoliciesService {
TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled();
+ Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>();
/**
* Update policies for a topic async
@@ -73,6 +77,10 @@ public interface TopicPoliciesService {
*/
void start();
+ void registerListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener);
+
+ void unregisterListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener);
+
class TopicPoliciesServiceDisabled implements TopicPoliciesService {
@Override
@@ -106,5 +114,15 @@ public interface TopicPoliciesService {
public void start() {
//No-op
}
+
+ @Override
+ public void registerListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
+ //No-op
+ }
+
+ @Override
+ public void unregisterListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
+ //No-op
+ }
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPolicyListener.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPolicyListener.java
new file mode 100644
index 0000000..0e39edb
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPolicyListener.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+public interface TopicPolicyListener<T> {
+ void onUpdate(T data);
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 29eed73..4cc711f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -29,10 +29,12 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -147,23 +149,51 @@ public class DispatchRateLimiter {
* default broker dispatch-throttling-rate
*/
public void updateDispatchRate() {
- DispatchRate dispatchRate = getPoliciesDispatchRate(brokerService);
+ Optional<DispatchRate> dispatchRate = getSystemTopicDispatchRate(brokerService, topicName);
+ if (!dispatchRate.isPresent()) {
+ dispatchRate =Optional.ofNullable(getPoliciesDispatchRate(brokerService));
- if (dispatchRate == null) {
- dispatchRate = createDispatchRate();
+ if (!dispatchRate.isPresent()) {
+ dispatchRate = Optional.of(createDispatchRate());
+ }
}
- updateDispatchRate(dispatchRate);
- log.info("[{}] configured {} message-dispatch rate at broker {}", this.topicName, type, dispatchRate);
+ updateDispatchRate(dispatchRate.get());
+ log.info("[{}] configured {} message-dispatch rate at broker {}", this.topicName, type, dispatchRate.get());
}
public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies,
String topicName, Type type) {
final ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration();
+ if (serviceConfig.isTopicLevelPoliciesEnabled() && type == Type.TOPIC) {
+ Optional<DispatchRate> dispatchRate = getSystemTopicDispatchRate(brokerService, topicName);
+ if (dispatchRate.isPresent()) {
+ return true;
+ }
+ }
+
policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName);
return isDispatchRateNeeded(serviceConfig, policies, topicName, type);
}
+ public static Optional<DispatchRate> getSystemTopicDispatchRate(BrokerService brokerService, String topicName) {
+ Optional<DispatchRate> dispatchRate = Optional.empty();
+ final ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration();
+ if (serviceConfiguration.isTopicLevelPoliciesEnabled()) {
+ try {
+ dispatchRate = Optional.ofNullable(brokerService.pulsar()
+ .getTopicPoliciesService().getTopicPolicies(TopicName.get(topicName)))
+ .map(TopicPolicies::getDispatchRate);
+ } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e){
+ log.debug("Topic {} policies cache have not init.", topicName);
+ } catch (Exception e) {
+ log.debug("[{}] Failed to get topic policies. Exception: {}", topicName, e);
+ }
+ }
+
+ return dispatchRate;
+ }
+
public static boolean isDispatchRateNeeded(final ServiceConfiguration serviceConfig,
final Optional<Policies> policies, final String topicName, final Type type) {
DispatchRate dispatchRate = getPoliciesDispatchRate(serviceConfig.getClusterName(), policies, type);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 711867c..3899313 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+
import com.carrotsearch.hppc.ObjectObjectHashMap;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@@ -87,7 +88,7 @@ import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.StreamingStats;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
-import org.apache.pulsar.broker.service.TopicPoliciesService;
+import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
@@ -133,7 +134,7 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback {
+public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback, TopicPolicyListener<TopicPolicies> {
// Managed ledger associated with the topic
protected final ManagedLedger ledger;
@@ -225,6 +226,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
this.backloggedCursorThresholdEntries = brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
initializeDispatchRateLimiterIfNeeded(Optional.empty());
+ brokerService.getPulsar().getTopicPoliciesService().registerListener(TopicName.get(topic), this);
this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
@@ -901,6 +903,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
+ brokerService.pulsar().getTopicPoliciesService().unregisterListener(TopicName.get(topic), getPersistentTopic());
log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
}
@@ -989,6 +992,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
+ brokerService.pulsar().getTopicPoliciesService().unregisterListener(TopicName.get(topic), getPersistentTopic());
log.info("[{}] Topic closed", topic);
closeFuture.complete(null);
}
@@ -2344,6 +2348,34 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return maxUnackedMessagesOnSubscription;
}
+ @Override
+ public void onUpdate(TopicPolicies policies) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] update topic policy: {}", topic, policies);
+ }
+
+ initializeTopicDispatchRateLimiterIfNeeded(Optional.ofNullable(policies));
+ if (this.dispatchRateLimiter.isPresent() && policies != null
+ && policies.getDispatchRate() != null) {
+ dispatchRateLimiter.ifPresent(dispatchRateLimiter ->
+ dispatchRateLimiter.updateDispatchRate(policies.getDispatchRate()));
+ }
+ }
+
+ private void initializeTopicDispatchRateLimiterIfNeeded(Optional<TopicPolicies> policies) {
+ synchronized (dispatchRateLimiter) {
+ if (!dispatchRateLimiter.isPresent() && policies.isPresent() &&
+ policies.get().getDispatchRate() != null) {
+ this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
+ }
+ }
+ }
+
+ private PersistentTopic getPersistentTopic() {
+ return this;
+ }
+
+
@VisibleForTesting
public MessageDeduplication getMessageDeduplication() {
return messageDeduplication;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
index 819cb5c..329c3c7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -129,4 +130,24 @@ public class TopicPoliciesDisableTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(e.getStatusCode(), 405);
}
}
+
+ @Test
+ public void testDispatchRateDisabled() throws Exception {
+ DispatchRate dispatchRate = new DispatchRate();
+ log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, testTopic);
+
+ try {
+ admin.topics().setDispatchRate(testTopic, dispatchRate);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+
+ try {
+ admin.topics().getDispatchRate(testTopic);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 903ddce..ef16adb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -337,4 +338,43 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
admin.topics().deletePartitionedTopic(testTopic, true);
}
+
+
+ @Test
+ public void testGetSetDispatchRate() throws Exception {
+ DispatchRate dispatchRate = new DispatchRate(100, 10000, 1, true);
+ log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, testTopic);
+
+ admin.topics().setDispatchRate(testTopic, dispatchRate);
+ log.info("Dispatch Rate set success on topic: {}", testTopic);
+
+ Thread.sleep(3000);
+ DispatchRate getDispatchRate = admin.topics().getDispatchRate(testTopic);
+ log.info("Dispatch Rate: {} get on topic: {}", getDispatchRate, testTopic);
+ Assert.assertEquals(getDispatchRate, dispatchRate);
+
+ admin.topics().deletePartitionedTopic(testTopic, true);
+ }
+
+ @Test
+ public void testRemoveDispatchRate() throws Exception {
+ DispatchRate dispatchRate = new DispatchRate(100, 10000, 1, true);
+ log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, testTopic);
+
+ admin.topics().setDispatchRate(testTopic, dispatchRate);
+ log.info("Dispatch Rate set success on topic: {}", testTopic);
+
+ Thread.sleep(3000);
+ DispatchRate getDispatchRate = admin.topics().getDispatchRate(testTopic);
+ log.info("Dispatch Rate: {} get on topic: {}", getDispatchRate, testTopic);
+ Assert.assertEquals(getDispatchRate, dispatchRate);
+
+ admin.topics().removeDispatchRate(testTopic);
+ Thread.sleep(3000);
+ log.info("Dispatch Rate get on topic: {} after remove", getDispatchRate, testTopic);
+ getDispatchRate = admin.topics().getDispatchRate(testTopic);
+ Assert.assertNull(getDispatchRate);
+
+ admin.topics().deletePartitionedTopic(testTopic, true);
+ }
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 6476f27..93d3192 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -1845,4 +1846,71 @@ public interface Topics {
* @return
*/
CompletableFuture<Void> disableDeduplicationAsync(String topic);
+
+ /**
+ * Set message-dispatch-rate (topic can dispatch this many messages per second).
+ *
+ * @param topic
+ * @param dispatchRate
+ * number of messages per second
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException;
+
+ /**
+ * Set message-dispatch-rate asynchronously.
+ * <p/>
+ * topic can dispatch this many messages per second
+ *
+ * @param topic
+ * @param dispatchRate
+ * number of messages per second
+ */
+ CompletableFuture<Void> setDispatchRateAsync(String topic, DispatchRate dispatchRate);
+
+ /**
+ * Get message-dispatch-rate (topic can dispatch this many messages per second).
+ *
+ * @param topic
+ * @returns messageRate
+ * number of messages per second
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ DispatchRate getDispatchRate(String topic) throws PulsarAdminException;
+
+ /**
+ * Get message-dispatch-rate asynchronously.
+ * <p/>
+ * Topic can dispatch this many messages per second.
+ *
+ * @param topic
+ * @returns messageRate
+ * number of messages per second
+ */
+ CompletableFuture<DispatchRate> getDispatchRateAsync(String topic);
+
+ /**
+ * Remove message-dispatch-rate.
+ * <p/>
+ * Remove topic message dispatch rate
+ *
+ * @param topic
+ * @throws PulsarAdminException
+ * unexpected error
+ */
+ void removeDispatchRate(String topic) throws PulsarAdminException;
+
+ /**
+ * Remove message-dispatch-rate asynchronously.
+ * <p/>
+ * Remove topic message dispatch rate
+ *
+ * @param topic
+ * @throws PulsarAdminException
+ * unexpected error
+ */
+ CompletableFuture<Void> removeDispatchRateAsync(String topic) throws PulsarAdminException;
+
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 62a39b9..ffd9339 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -69,6 +69,7 @@ import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -1926,5 +1927,82 @@ public class TopicsImpl extends BaseResource implements Topics {
return asyncDeleteRequest(path);
}
+ @Override
+ public DispatchRate getDispatchRate(String topic) throws PulsarAdminException {
+ try {
+ return getDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<DispatchRate> getDispatchRateAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "dispatchRate");
+ final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<DispatchRate>() {
+ @Override
+ public void completed(DispatchRate dispatchRate) {
+ future.complete(dispatchRate);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void setDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException {
+ try {
+ setDispatchRateAsync(topic, dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setDispatchRateAsync(String topic, DispatchRate dispatchRate) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "dispatchRate");
+ return asyncPostRequest(path, Entity.entity(dispatchRate, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void removeDispatchRate(String topic) throws PulsarAdminException {
+ try {
+ removeDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeDispatchRateAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "dispatchRate");
+ return asyncDeleteRequest(path);
+ }
+
+
private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 22e514e..b84794e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -119,6 +120,9 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("get-persistence", new GetPersistence());
jcommander.addCommand("set-persistence", new SetPersistence());
jcommander.addCommand("remove-persistence", new RemovePersistence());
+ jcommander.addCommand("get-dispatch-rate", new GetDispatchRate());
+ jcommander.addCommand("set-dispatch-rate", new SetDispatchRate());
+ jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate());
}
@Parameters(commandDescription = "Get the list of topics under a namespace.")
@@ -1105,4 +1109,57 @@ public class CmdTopics extends CmdBase {
admin.topics().removePersistence(persistentTopic);
}
}
+
+ @Parameters(commandDescription = "Get message dispatch rate for a topic")
+ private class GetDispatchRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(admin.topics().getDispatchRate(persistentTopic));
+ }
+ }
+
+ @Parameters(commandDescription = "Set message dispatch rate for a topic")
+ private class SetDispatchRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--msg-dispatch-rate",
+ "-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+ private int msgDispatchRate = -1;
+
+ @Parameter(names = { "--byte-dispatch-rate",
+ "-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+ private long byteDispatchRate = -1;
+
+ @Parameter(names = { "--dispatch-rate-period",
+ "-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false)
+ private int dispatchRatePeriodSec = 1;
+
+ @Parameter(names = { "--relative-to-publish-rate",
+ "-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
+ private boolean relativeToPublishRate = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().setDispatchRate(persistentTopic,
+ new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove message dispatch rate for a topic")
+ private class RemoveDispatchRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().removeDispatchRate(persistentTopic);
+ }
+ }
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index 7864e0e..0e459c6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -49,6 +49,7 @@ public class TopicPolicies {
private Integer maxUnackedMessagesOnSubscription = null;
private Long delayedDeliveryTickTimeMillis = null;
private Boolean delayedDeliveryEnabled = null;
+ private DispatchRate dispatchRate = null;
public boolean isMaxUnackedMessagesOnConsumerSet() {
return maxUnackedMessagesOnConsumer != null;
@@ -97,4 +98,8 @@ public class TopicPolicies {
public boolean isMaxConsumersPerSubscriptionSet() {
return maxConsumersPerSubscription != null;
}
+
+ public boolean isDispatchRateSet() {
+ return dispatchRate != null;
+ }
}