You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/08 06:25:12 UTC
[pulsar] branch branch-2.10 updated: [Branch-2.10][Cherry-pick] tidy update subscriptions dispatcher rate-limiter (#16778)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new a88e96ec9c4 [Branch-2.10][Cherry-pick] tidy update subscriptions dispatcher rate-limiter (#16778)
a88e96ec9c4 is described below
commit a88e96ec9c466620bac478ed4fc5eff4c88725ff
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Mon Aug 8 14:25:05 2022 +0800
[Branch-2.10][Cherry-pick] tidy update subscriptions dispatcher rate-limiter (#16778)
---
.../pulsar/broker/service/AbstractTopic.java | 58 +++++++++
.../pulsar/broker/service/BrokerService.java | 38 ++++++
.../org/apache/pulsar/broker/service/Topic.java | 5 +
.../service/persistent/DispatchRateLimiter.java | 3 +
.../broker/service/persistent/PersistentTopic.java | 116 +++++++++---------
.../service/persistent/SubscribeRateLimiter.java | 76 +-----------
.../PrecisTopicPublishRateThrottleTest.java | 42 +++++++
.../pulsar/broker/service/ServerCnxTest.java | 5 +
.../pulsar/broker/service/SubscribeRateTest.java | 131 +++++++++++++++++++++
.../pulsar/common/policies/data/SubscribeRate.java | 10 ++
.../policies/data/HierarchyTopicPolicies.java | 4 +
11 files changed, 354 insertions(+), 134 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 917dec0b423..f9efa8ea186 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -65,6 +65,7 @@ import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
@@ -157,6 +158,10 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
this.preciseTopicPublishRateLimitingEnable = config.isPreciseTopicPublishRateLimiterEnable();
}
+ public SubscribeRate getSubscribeRate() {
+ return this.topicPolicies.getSubscribeRate().get();
+ }
+
public DispatchRateImpl getSubscriptionDispatchRate() {
return this.topicPolicies.getSubscriptionDispatchRate().get();
}
@@ -169,6 +174,10 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
return this.topicPolicies.getReplicatorDispatchRate().get();
}
+ public DispatchRateImpl getDispatchRate() {
+ return this.topicPolicies.getDispatchRate().get();
+ }
+
private SchemaCompatibilityStrategy formatSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
return strategy == SchemaCompatibilityStrategy.UNDEFINED ? null : strategy;
}
@@ -204,8 +213,10 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
topicPolicies.getReplicatorDispatchRate().updateTopicValue(normalize(data.getReplicatorDispatchRate()));
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
+ topicPolicies.getSubscribeRate().updateTopicValue(SubscribeRate.normalize(data.getSubscribeRate()));
topicPolicies.getSubscriptionDispatchRate().updateTopicValue(normalize(data.getSubscriptionDispatchRate()));
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
+ topicPolicies.getDispatchRate().updateTopicValue(normalize(data.getDispatchRate()));
}
protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
@@ -247,9 +258,24 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
type -> this.topicPolicies.getBackLogQuotaMap().get(type)
.updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, type)));
+ updateNamespaceSubscribeRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
updateNamespaceSubscriptionDispatchRate(namespacePolicies,
brokerService.getPulsar().getConfig().getClusterName());
updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
+ updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
+ }
+
+ private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) {
+ DispatchRateImpl dispatchRate = namespacePolicies.topicDispatchRate.get(cluster);
+ if (dispatchRate == null) {
+ dispatchRate = namespacePolicies.clusterDispatchRate.get(cluster);
+ }
+ topicPolicies.getDispatchRate().updateNamespaceValue(normalize(dispatchRate));
+ }
+
+ private void updateNamespaceSubscribeRate(Policies namespacePolicies, String cluster) {
+ topicPolicies.getSubscribeRate()
+ .updateNamespaceValue(SubscribeRate.normalize(namespacePolicies.clusterSubscribeRate.get(cluster)));
}
private void updateNamespaceSubscriptionDispatchRate(Policies namespacePolicies, String cluster) {
@@ -335,9 +361,26 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
if (isSystemTopic()) {
schemaCompatibilityStrategy = config.getSystemTopicSchemaCompatibilityStrategy();
}
+ topicPolicies.getSubscribeRate().updateBrokerValue(subscribeRateInBroker(config));
topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(subscriptionDispatchRateInBroker(config));
topicPolicies.getSchemaCompatibilityStrategy()
.updateBrokerValue(formatSchemaCompatibilityStrategy(schemaCompatibilityStrategy));
+ topicPolicies.getDispatchRate().updateBrokerValue(dispatchRateInBroker(config));
+ }
+
+ private DispatchRateImpl dispatchRateInBroker(ServiceConfiguration config) {
+ return DispatchRateImpl.builder()
+ .dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerTopicInMsg())
+ .dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerTopicInByte())
+ .ratePeriodInSecond(1)
+ .build();
+ }
+
+ private SubscribeRate subscribeRateInBroker(ServiceConfiguration config) {
+ return new SubscribeRate(
+ config.getSubscribeThrottlingRatePerConsumer(),
+ config.getSubscribeRatePeriodPerConsumerInSecond()
+ );
}
private DispatchRateImpl subscriptionDispatchRateInBroker(ServiceConfiguration config) {
@@ -1173,4 +1216,19 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
topicPolicies.getReplicatorDispatchRate().updateBrokerValue(
replicatorDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
}
+
+ public void updateBrokerDispatchRate() {
+ topicPolicies.getDispatchRate().updateBrokerValue(
+ dispatchRateInBroker(brokerService.pulsar().getConfiguration()));
+ }
+
+ public void updateBrokerPublishRate() {
+ topicPolicies.getPublishRate().updateBrokerValue(
+ publishRateInBroker(brokerService.pulsar().getConfiguration()));
+ }
+
+ public void updateBrokerSubscribeRate() {
+ topicPolicies.getSubscribeRate().updateBrokerValue(
+ subscribeRateInBroker(brokerService.pulsar().getConfiguration()));
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 4b07087759e..dab1c349aed 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2216,6 +2216,21 @@ public class BrokerService implements Closeable {
(publisherThrottlingTickTimeMillis) -> {
setupBrokerPublishRateLimiterMonitor();
});
+
+ // add listener to update topic publish-rate dynamic config
+ registerConfigurationListener("maxPublishRatePerTopicInMessages",
+ maxPublishRatePerTopicInMessages -> updateMaxPublishRatePerTopicInMessages()
+ );
+ registerConfigurationListener("maxPublishRatePerTopicInBytes",
+ maxPublishRatePerTopicInMessages -> updateMaxPublishRatePerTopicInMessages()
+ );
+
+ // add listener to update subscribe-rate dynamic config
+ registerConfigurationListener("subscribeThrottlingRatePerConsumer",
+ subscribeThrottlingRatePerConsumer -> updateSubscribeRate());
+ registerConfigurationListener("subscribeRatePeriodPerConsumerInSecond",
+ subscribeRatePeriodPerConsumerInSecond -> updateSubscribeRate());
+
// add listener to notify broker publish-rate dynamic config
registerConfigurationListener("brokerPublisherThrottlingMaxMessageRate",
(brokerPublisherThrottlingMaxMessageRate) ->
@@ -2252,6 +2267,26 @@ public class BrokerService implements Closeable {
}
}
+ private void updateMaxPublishRatePerTopicInMessages() {
+ this.pulsar().getExecutor().submit(() ->
+ forEachTopic(topic -> {
+ if (topic instanceof AbstractTopic) {
+ ((AbstractTopic) topic).updateBrokerPublishRate();
+ ((AbstractTopic) topic).updatePublishDispatcher();
+ }
+ }));
+ }
+
+ private void updateSubscribeRate() {
+ this.pulsar().getExecutor().submit(() ->
+ forEachTopic(topic -> {
+ if (topic instanceof PersistentTopic) {
+ ((PersistentTopic) topic).updateBrokerSubscribeRate();
+ ((PersistentTopic) topic).updateSubscribeRateLimiter();
+ }
+ }));
+ }
+
private void updateBrokerPublisherThrottlingMaxRate() {
int currentMaxMessageRate = pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate();
long currentMaxByteRate = pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate();
@@ -2284,6 +2319,9 @@ public class BrokerService implements Closeable {
this.pulsar().getExecutor().execute(() -> {
// update message-rate for each topic
forEachTopic(topic -> {
+ if (topic instanceof AbstractTopic) {
+ ((AbstractTopic) topic).updateBrokerDispatchRate();
+ }
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().updateDispatchRate();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index fc1eadf47e4..e2ffb41390a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
+import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
@@ -285,6 +286,10 @@ public interface Topic {
return Optional.empty();
}
+ default Optional<SubscribeRateLimiter> getSubscribeRateLimiter() {
+ return Optional.empty();
+ }
+
default Optional<DispatchRateLimiter> getBrokerDispatchRateLimiter() {
return Optional.empty();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 3247494577a..cd7f5e9ea64 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -170,6 +170,9 @@ public class DispatchRateLimiter {
*/
public void updateDispatchRate() {
switch (type) {
+ case TOPIC:
+ updateDispatchRate(topic.getDispatchRate());
+ return;
case SUBSCRIPTION:
updateDispatchRate(topic.getSubscriptionDispatchRate());
return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5be91078b14..51c27bfd8f9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service.persistent;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter.isSubscribeRateEnabled;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
@@ -135,6 +136,7 @@ import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerI
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
@@ -260,7 +262,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
.build();
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
- initializeRateLimiterIfNeeded(Optional.empty());
registerTopicPolicyListener();
this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
@@ -316,6 +317,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
isEncryptionRequired = false;
updatePublishDispatcher();
updateResourceGroupLimiter(optPolicies);
+ initializeDispatchRateLimiterIfNeeded();
+ updateSubscribeRateLimiter();
return;
}
@@ -323,6 +326,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
this.updateTopicPolicyByNamespacePolicy(policies);
+ initializeDispatchRateLimiterIfNeeded();
+
+ updateSubscribeRateLimiter();
+
updatePublishDispatcher();
updateResourceGroupLimiter(optPolicies);
@@ -367,27 +374,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
}
- private void initializeRateLimiterIfNeeded(Optional<Policies> policies) {
+ private void initializeDispatchRateLimiterIfNeeded() {
synchronized (dispatchRateLimiterLock) {
// dispatch rate limiter for topic
- if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
- .isDispatchRateNeeded(brokerService, policies, topic, Type.TOPIC)) {
+ if (!dispatchRateLimiter.isPresent()
+ && DispatchRateLimiter.isDispatchRateEnabled(topicPolicies.getDispatchRate().get())) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
}
- boolean isDispatchRateNeeded = SubscribeRateLimiter.isDispatchRateNeeded(brokerService, policies, topic);
- if (!subscribeRateLimiter.isPresent() && isDispatchRateNeeded) {
- this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
- } else if (!isDispatchRateNeeded) {
- this.subscribeRateLimiter = Optional.empty();
- }
-
- // dispatch rate limiter for each subscription
- subscriptions.forEach((name, subscription) -> {
- Dispatcher dispatcher = subscription.getDispatcher();
- if (dispatcher != null) {
- dispatcher.initializeDispatchRateLimiterIfNeeded();
- }
- });
}
}
@@ -434,6 +427,24 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
}
+ public void updateSubscribeRateLimiter() {
+ SubscribeRate subscribeRate = getSubscribeRate();
+ synchronized (subscribeRateLimiter) {
+ if (isSubscribeRateEnabled(subscribeRate)) {
+ if (subscribeRateLimiter.isPresent()) {
+ this.subscribeRateLimiter.get().onSubscribeRateUpdate(subscribeRate);
+ } else {
+ this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
+ }
+ } else {
+ if (subscribeRateLimiter.isPresent()) {
+ subscribeRateLimiter.get().close();
+ subscribeRateLimiter = Optional.empty();
+ }
+ }
+ }
+ }
+
private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext publishContext) {
if (brokerService.isBrokerEntryMetadataEnabled()) {
ledger.asyncAddEntry(headersAndPayload,
@@ -2375,10 +2386,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
schemaValidationEnforced = data.schema_validation_enforced;
- //If the topic-level policy already exists, the namespace-level policy cannot override the topic-level policy.
- Optional<TopicPolicies> topicPolicies = getTopicPolicies();
+ initializeDispatchRateLimiterIfNeeded();
- initializeRateLimiterIfNeeded(Optional.ofNullable(data));
+ updateSubscribeRateLimiter();
updatePublishDispatcher();
@@ -2389,36 +2399,16 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> {
- List<CompletableFuture<Void>> subscriptionCheckFutures = new ArrayList<>((int) subscriptions.size());
- subscriptions.forEach((subName, sub) -> {
- List<CompletableFuture<Void>> consumerCheckFutures = new ArrayList<>(sub.getConsumers().size());
- sub.getConsumers().forEach(consumer -> consumerCheckFutures.add(consumer.checkPermissionsAsync()));
- subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> {
- Dispatcher dispatcher = sub.getDispatcher();
- // If the topic-level policy already exists, the namespace-level policy cannot override
- // the topic-level policy.
- if (dispatcher != null && (!topicPolicies.isPresent() || !topicPolicies.get()
- .isSubscriptionDispatchRateSet())) {
- dispatcher.getRateLimiter()
- .ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data));
- }
- }));
- });
-
- return FutureUtil.waitForAll(subscriptionCheckFutures).thenCompose((___) -> {
+ return updateSubscriptionsDispatcherRateLimiter().thenCompose((___) -> {
replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
checkMessageExpiry();
CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
// update rate-limiter if policies updated
- if (this.dispatchRateLimiter.isPresent()) {
- if (!topicPolicies.isPresent() || !topicPolicies.get().isDispatchRateSet()) {
- dispatchRateLimiter.get().onPoliciesUpdate(data);
- }
- }
+ dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
if (this.subscribeRateLimiter.isPresent()) {
- subscribeRateLimiter.get().onPoliciesUpdate(data);
+ subscribeRateLimiter.get().onSubscribeRateUpdate(getSubscribeRate());
}
return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture,
@@ -3039,30 +3029,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
Optional<Policies> namespacePolicies = getNamespacePolicies();
initializeTopicDispatchRateLimiterIfNeeded(policies);
- dispatchRateLimiter.ifPresent(limiter -> {
- if (policies.isDispatchRateSet()) {
- dispatchRateLimiter.get().updateDispatchRate(policies.getDispatchRate());
- } else {
- dispatchRateLimiter.get().updateDispatchRate();
- }
- });
-
- List<CompletableFuture<Void>> consumerCheckFutures = new ArrayList<>();
- subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(consumer -> {
- consumerCheckFutures.add(consumer.checkPermissionsAsync().thenRun(() -> {
- Dispatcher dispatcher = sub.getDispatcher();
- if (dispatcher != null) {
- dispatcher.updateRateLimiter();
- }
- }));
- }));
+ dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
- FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> {
+ updateSubscriptionsDispatcherRateLimiter().thenRun(() -> {
updatePublishDispatcher();
initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
if (this.subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
- subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
+ subscribeRateLimiter.onSubscribeRateUpdate(getSubscribeRate()));
}
replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
checkMessageExpiry();
@@ -3087,6 +3061,21 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return DispatchRateLimiter.getPolicies(brokerService, topic);
}
+ private CompletableFuture<Void> updateSubscriptionsDispatcherRateLimiter() {
+ List<CompletableFuture<Void>> subscriptionCheckFutures = new ArrayList<>((int) subscriptions.size());
+ subscriptions.forEach((subName, sub) -> {
+ List<CompletableFuture<Void>> consumerCheckFutures = new ArrayList<>(sub.getConsumers().size());
+ sub.getConsumers().forEach(consumer -> consumerCheckFutures.add(consumer.checkPermissionsAsync()));
+ subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> {
+ Dispatcher dispatcher = sub.getDispatcher();
+ if (dispatcher != null) {
+ dispatcher.updateRateLimiter();
+ }
+ }));
+ });
+ return FutureUtil.waitForAll(subscriptionCheckFutures);
+ }
+
private void initializeTopicDispatchRateLimiterIfNeeded(TopicPolicies policies) {
synchronized (dispatchRateLimiterLock) {
if (!dispatchRateLimiter.isPresent() && policies.getDispatchRate() != null) {
@@ -3106,6 +3095,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
} else if (!policies.get().isSubscribeRateSet()
|| policies.get().getSubscribeRate().subscribeThrottlingRatePerConsumer <= 0) {
+ subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
this.subscribeRateLimiter = Optional.empty();
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index 93707b4977b..89af6f6be88 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -27,12 +27,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
-import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,22 +49,7 @@ public class SubscribeRateLimiter {
subscribeRateLimiter = new ConcurrentHashMap<>();
this.executorService = brokerService.pulsar().getExecutor();
// get subscribeRate from topic level policies
- this.subscribeRate = topic.getTopicPolicies()
- .map(TopicPolicies::getSubscribeRate)
- .orElse(null);
-
- // subscribeRate of topic level policies not set, get from zookeeper
- if (this.subscribeRate == null) {
- this.subscribeRate = getPoliciesSubscribeRate();
- }
-
- // get subscribeRate from broker.conf
- if (this.subscribeRate == null) {
- this.subscribeRate = new SubscribeRate(brokerService.pulsar()
- .getConfiguration().getSubscribeThrottlingRatePerConsumer(),
- brokerService.pulsar().getConfiguration().getSubscribeRatePeriodPerConsumerInSecond());
-
- }
+ this.subscribeRate = topic.getSubscribeRate();
if (isSubscribeRateEnabled(this.subscribeRate)) {
resetTask = createTask();
log.info("[{}] configured subscribe-dispatch rate at broker {}", this.topicName, subscribeRate);
@@ -157,40 +139,9 @@ public class SubscribeRateLimiter {
}
}
- public void onPoliciesUpdate(Policies data) {
- // if subscribe rate is set on topic policy, skip subscribe rate update
- SubscribeRate subscribeRate = brokerService.getTopicPolicies(TopicName.get(topicName))
- .map(TopicPolicies::getSubscribeRate)
- .orElse(null);
- if (subscribeRate != null) {
- return;
- }
-
- String cluster = brokerService.pulsar().getConfiguration().getClusterName();
-
- subscribeRate = data.clusterSubscribeRate.get(cluster);
-
- onSubscribeRateUpdate(subscribeRate);
-
- }
-
public void onSubscribeRateUpdate(SubscribeRate subscribeRate) {
- final SubscribeRate namespacePolicySubscribeRate = getPoliciesSubscribeRate();
- final SubscribeRate newSubscribeRate = new SubscribeRate(
- brokerService.pulsar().getConfiguration().getSubscribeThrottlingRatePerConsumer(),
- brokerService.pulsar().getConfiguration().getSubscribeRatePeriodPerConsumerInSecond()
- );
-
- // if policy-throttling rate is disabled and cluster-throttling is enabled then apply
- // cluster-throttling rate
- // if topic policy-throttling rate is disabled
- if (!isSubscribeRateEnabled(subscribeRate) && isSubscribeRateEnabled(namespacePolicySubscribeRate)) {
- subscribeRate = namespacePolicySubscribeRate;
- }
-
- if (!isSubscribeRateEnabled(subscribeRate) && !isSubscribeRateEnabled(namespacePolicySubscribeRate)
- && isSubscribeRateEnabled(newSubscribeRate)) {
- subscribeRate = newSubscribeRate;
+ if (this.subscribeRate.equals(subscribeRate)) {
+ return;
}
this.subscribeRate = subscribeRate;
stopResetTask();
@@ -216,23 +167,6 @@ public class SubscribeRateLimiter {
return getPoliciesSubscribeRate(brokerService, topicName);
}
- public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies,
- String topicName) {
- ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration();
- policies = policies.isPresent() ? policies : DispatchRateLimiter.getPolicies(brokerService, topicName);
- return isDispatchRateNeeded(serviceConfig, policies, topicName);
- }
-
- private static boolean isDispatchRateNeeded(final ServiceConfiguration serviceConfig,
- final Optional<Policies> policies, final String topicName) {
- SubscribeRate subscribeRate = getPoliciesSubscribeRate(serviceConfig.getClusterName(), policies, topicName);
- if (subscribeRate == null) {
- return serviceConfig.getSubscribeThrottlingRatePerConsumer() > 0
- && serviceConfig.getSubscribeRatePeriodPerConsumerInSecond() > 0;
- }
- return true;
- }
-
public static SubscribeRate getPoliciesSubscribeRate(BrokerService brokerService, final String topicName) {
final String cluster = brokerService.pulsar().getConfiguration().getClusterName();
final Optional<Policies> policies = DispatchRateLimiter.getPolicies(brokerService, topicName);
@@ -262,8 +196,8 @@ public class SubscribeRateLimiter {
!= null ? subscribeRateLimiter.get(consumerIdentifier).getRate() : -1;
}
- private static boolean isSubscribeRateEnabled(SubscribeRate subscribeRate) {
- return subscribeRate != null && (subscribeRate.subscribeThrottlingRatePerConsumer > 0);
+ public static boolean isSubscribeRateEnabled(SubscribeRate subscribeRate) {
+ return subscribeRate.subscribeThrottlingRatePerConsumer > 0 && subscribeRate.ratePeriodInSecond > 0;
}
public void close() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
index 4a40bc162de..82356055031 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.policies.data.PublishRate;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -140,4 +141,45 @@ public class PrecisTopicPublishRateThrottleTest extends BrokerTestBase{
Assert.assertNotNull(messageId);
super.internalCleanup();
}
+
+ @Test
+ public void testBrokerLevelPublishRateDynamicUpdate() throws Exception{
+ conf.setPreciseTopicPublishRateLimiterEnable(true);
+ conf.setTopicLevelPoliciesEnabled(true);
+ conf.setSystemTopicEnabled(true);
+ conf.setMaxPendingPublishRequestsPerConnection(0);
+ super.baseSetup();
+ final String topic = "persistent://prop/ns-abc/testMultiLevelPublishRate";
+ org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .producerName("producer-name")
+ .create();
+
+ final int rateInMsg = 10;
+ final long rateInByte = 20;
+
+ // maxPublishRatePerTopicInMessages
+ admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInMessages", "" + rateInMsg);
+ Awaitility.await()
+ .untilAsserted(() ->
+ Assert.assertEquals(admin.brokers().getAllDynamicConfigurations().get("maxPublishRatePerTopicInMessages"),
+ "" + rateInMsg));
+ Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
+ Assert.assertNotNull(topicRef);
+ PrecisPublishLimiter limiter = ((PrecisPublishLimiter) ((AbstractTopic) topicRef).topicPublishRateLimiter);
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(limiter.publishMaxMessageRate, rateInMsg));
+ Assert.assertEquals(limiter.publishMaxByteRate, 0);
+
+ // maxPublishRatePerTopicInBytes
+ admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInBytes", "" + rateInByte);
+ Awaitility.await()
+ .untilAsserted(() ->
+ Assert.assertEquals(admin.brokers().getAllDynamicConfigurations().get("maxPublishRatePerTopicInBytes"),
+ "" + rateInByte));
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(limiter.publishMaxByteRate, rateInByte));
+ Assert.assertEquals(limiter.publishMaxMessageRate, rateInMsg);
+
+ producer.close();
+ super.internalCleanup();
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 12a4f0db18d..3a38994fa25 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1393,6 +1393,7 @@ public class ServerCnxTest {
Policies policies = mock(Policies.class);
policies.encryption_required = true;
policies.topicDispatchRate = Maps.newHashMap();
+ policies.clusterSubscribeRate = Maps.newHashMap();
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
policies.clusterDispatchRate = Maps.newHashMap();
@@ -1428,6 +1429,7 @@ public class ServerCnxTest {
Policies policies = mock(Policies.class);
policies.encryption_required = true;
policies.topicDispatchRate = Maps.newHashMap();
+ policies.clusterSubscribeRate = Maps.newHashMap();
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
policies.clusterDispatchRate = Maps.newHashMap();
@@ -1468,6 +1470,7 @@ public class ServerCnxTest {
// Namespace policy doesn't require encryption
policies.encryption_required = false;
policies.topicDispatchRate = Maps.newHashMap();
+ policies.clusterSubscribeRate = Maps.newHashMap();
// add `clusterDispatchRate` otherwise there will be a NPE
policies.clusterDispatchRate = Maps.newHashMap();
// add `clusterDispatchRate` otherwise there will be a NPE
@@ -1504,6 +1507,7 @@ public class ServerCnxTest {
Policies policies = mock(Policies.class);
policies.encryption_required = true;
policies.topicDispatchRate = Maps.newHashMap();
+ policies.clusterSubscribeRate = Maps.newHashMap();
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
policies.clusterDispatchRate = Maps.newHashMap();
@@ -1547,6 +1551,7 @@ public class ServerCnxTest {
Policies policies = mock(Policies.class);
policies.encryption_required = true;
policies.topicDispatchRate = Maps.newHashMap();
+ policies.clusterSubscribeRate = Maps.newHashMap();
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
policies.clusterDispatchRate = Maps.newHashMap();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java
new file mode 100644
index 00000000000..547fbe354f1
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import java.util.Optional;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class SubscribeRateTest extends BrokerTestBase {
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.baseSetup();
+ conf.setTopicLevelPoliciesEnabled(true);
+ conf.setSystemTopicEnabled(true);
+ conf.setMaxPendingPublishRequestsPerConnection(0);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testBrokerLevelSubscribeRateDynamicUpdate() throws Exception {
+ final String topic = "persistent://prop/ns-abc/testBrokerLevelSubscribeRateDynamicUpdate";
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .producerName("producer-name")
+ .create();
+
+ Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
+ Assert.assertNotNull(topicRef);
+ Assert.assertFalse(topicRef.getSubscribeRateLimiter().isPresent());
+
+ final int ratePerConsumer = 10;
+ final int ratePeriod = 60;
+
+ String defaultRatePerConsumer = admin.brokers().getRuntimeConfigurations().get("subscribeThrottlingRatePerConsumer");
+ String defaultRatePeriod = admin.brokers().getRuntimeConfigurations().get("subscribeRatePeriodPerConsumerInSecond");
+ Assert.assertNotNull(defaultRatePerConsumer);
+ Assert.assertNotNull(defaultRatePeriod);
+ Assert.assertNotEquals(ratePerConsumer, Integer.parseInt(defaultRatePerConsumer));
+ Assert.assertNotEquals(ratePeriod, Integer.parseInt(defaultRatePeriod));
+
+ // subscribeThrottlingRatePerConsumer
+ admin.brokers().updateDynamicConfiguration("subscribeThrottlingRatePerConsumer", ratePerConsumer + "");
+ Awaitility.await().untilAsserted(() -> Assert.assertTrue(topicRef.getSubscribeRateLimiter().isPresent()));
+ SubscribeRateLimiter limiter = topicRef.getSubscribeRateLimiter().get();
+ Assert.assertEquals(limiter.getSubscribeRate().subscribeThrottlingRatePerConsumer, ratePerConsumer);
+ Assert.assertEquals(limiter.getSubscribeRate().ratePeriodInSecond, 30);
+
+ // subscribeRatePeriodPerConsumerInSecond
+ admin.brokers().updateDynamicConfiguration("subscribeRatePeriodPerConsumerInSecond", ratePeriod + "");
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(limiter.getSubscribeRate().ratePeriodInSecond, ratePeriod));
+ Assert.assertEquals(limiter.getSubscribeRate().subscribeThrottlingRatePerConsumer, ratePerConsumer);
+
+ producer.close();
+ }
+
+ @Test
+ public void testUpdateSubscribeRateLimiter() throws Exception {
+
+ final String topic = "persistent://prop/ns-abc/testUpdateSubscribeRateLimiter";
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .producerName("producer-name")
+ .create();
+
+ Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
+ Assert.assertNotNull(topicRef);
+ Assert.assertTrue(topicRef instanceof PersistentTopic);
+ Assert.assertFalse(topicRef.getSubscribeRateLimiter().isPresent());
+
+ // init
+ PersistentTopic persistentTopic = spy(((PersistentTopic) topicRef));
+ when(persistentTopic.getSubscribeRate()).thenReturn(new SubscribeRate(10, 60));
+ persistentTopic.updateSubscribeRateLimiter();
+
+ Optional<SubscribeRateLimiter> limiter1 = persistentTopic.getSubscribeRateLimiter();
+ Assert.assertTrue(limiter1.isPresent());
+ Assert.assertEquals(limiter1.get().getSubscribeRate(), new SubscribeRate(10, 60));
+
+ // update
+ when(persistentTopic.getSubscribeRate()).thenReturn(new SubscribeRate(20, 120));
+ persistentTopic.updateSubscribeRateLimiter();
+
+ Optional<SubscribeRateLimiter> limiter2 = persistentTopic.getSubscribeRateLimiter();
+ Assert.assertTrue(limiter2.isPresent());
+ Assert.assertEquals(limiter2.get().getSubscribeRate(), new SubscribeRate(20, 120));
+
+ Assert.assertSame(limiter1, limiter2);
+
+ // disable
+ when(persistentTopic.getSubscribeRate()).thenReturn(new SubscribeRate(0, 0));
+ persistentTopic.updateSubscribeRateLimiter();
+
+ Optional<SubscribeRateLimiter> limiter3 = persistentTopic.getSubscribeRateLimiter();
+ Assert.assertFalse(limiter3.isPresent());
+ }
+}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscribeRate.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscribeRate.java
index dc553c69267..ea641b44b2a 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscribeRate.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscribeRate.java
@@ -38,6 +38,16 @@ public class SubscribeRate {
this.ratePeriodInSecond = ratePeriodInSecond;
}
+ public static SubscribeRate normalize(SubscribeRate subscribeRate) {
+ if (subscribeRate != null
+ && subscribeRate.subscribeThrottlingRatePerConsumer > 0
+ && subscribeRate.ratePeriodInSecond > 0) {
+ return subscribeRate;
+ } else {
+ return null;
+ }
+ }
+
@Override
public int hashCode() {
return Objects.hash(subscribeThrottlingRatePerConsumer, ratePeriodInSecond);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index 917b9753cc3..0532744bec3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -53,8 +53,10 @@ public class HierarchyTopicPolicies {
final PolicyHierarchyValue<Long> delayedDeliveryTickTimeMillis;
final PolicyHierarchyValue<DispatchRateImpl> replicatorDispatchRate;
final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;
+ final PolicyHierarchyValue<SubscribeRate> subscribeRate;
final PolicyHierarchyValue<DispatchRateImpl> subscriptionDispatchRate;
final PolicyHierarchyValue<SchemaCompatibilityStrategy> schemaCompatibilityStrategy;
+ final PolicyHierarchyValue<DispatchRateImpl> dispatchRate;
public HierarchyTopicPolicies() {
replicationClusters = new PolicyHierarchyValue<>();
@@ -80,7 +82,9 @@ public class HierarchyTopicPolicies {
delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>();
replicatorDispatchRate = new PolicyHierarchyValue<>();
compactionThreshold = new PolicyHierarchyValue<>();
+ subscribeRate = new PolicyHierarchyValue<>();
subscriptionDispatchRate = new PolicyHierarchyValue<>();
schemaCompatibilityStrategy = new PolicyHierarchyValue<>();
+ dispatchRate = new PolicyHierarchyValue<>();
}
}