You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/24 13:01:05 UTC

[pulsar] branch master updated: Optimize topic policy with HierarchyTopicPolicies about dispatchRate (#14038)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 009da3f  Optimize topic policy with HierarchyTopicPolicies about dispatchRate (#14038)
009da3f is described below

commit 009da3f87d0ce64d9a254a422781e26e522c060e
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Thu Feb 24 20:59:30 2022 +0800

    Optimize topic policy with HierarchyTopicPolicies about dispatchRate (#14038)
---
 .../pulsar/broker/service/AbstractTopic.java       | 28 ++++++++++++++++++++++
 .../pulsar/broker/service/BrokerService.java       |  3 +++
 .../service/persistent/DispatchRateLimiter.java    |  3 +++
 .../broker/service/persistent/PersistentTopic.java | 22 ++++++-----------
 .../policies/data/HierarchyTopicPolicies.java      |  2 ++
 5 files changed, 43 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index c94de2f..1bf53d9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -159,6 +159,10 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         return this.topicPolicies.getSchemaCompatibilityStrategy().get();
     }
 
+    public DispatchRateImpl getDispatchRate() {
+        return this.topicPolicies.getDispatchRate().get();
+    }
+
     private SchemaCompatibilityStrategy formatSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
         return strategy == SchemaCompatibilityStrategy.UNDEFINED ? null : strategy;
     }
@@ -195,6 +199,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
         topicPolicies.getSubscriptionDispatchRate().updateTopicValue(normalize(data.getSubscriptionDispatchRate()));
         topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
+        topicPolicies.getDispatchRate().updateTopicValue(normalize(data.getDispatchRate()));
     }
 
     protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
@@ -237,6 +242,15 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         updateNamespaceSubscriptionDispatchRate(namespacePolicies,
             brokerService.getPulsar().getConfig().getClusterName());
         updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
+        updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
+    }
+
+    private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) {
+        DispatchRateImpl dispatchRate = namespacePolicies.topicDispatchRate.get(cluster);
+        if (dispatchRate == null) {
+            dispatchRate = namespacePolicies.clusterDispatchRate.get(cluster);
+        }
+        topicPolicies.getDispatchRate().updateNamespaceValue(normalize(dispatchRate));
     }
 
     private void updateNamespaceSubscriptionDispatchRate(Policies namespacePolicies, String cluster) {
@@ -319,6 +333,15 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(subscriptionDispatchRateInBroker(config));
         topicPolicies.getSchemaCompatibilityStrategy()
                 .updateBrokerValue(formatSchemaCompatibilityStrategy(schemaCompatibilityStrategy));
+        topicPolicies.getDispatchRate().updateBrokerValue(dispatchRateInBroker(config));
+    }
+
+    private DispatchRateImpl dispatchRateInBroker(ServiceConfiguration config) {
+        return DispatchRateImpl.builder()
+                .dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerTopicInMsg())
+                .dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerTopicInByte())
+                .ratePeriodInSecond(1)
+                .build();
     }
 
     private DispatchRateImpl subscriptionDispatchRateInBroker(ServiceConfiguration config) {
@@ -1118,4 +1141,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(
             subscriptionDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
     }
+
+    public void updateBrokerDispatchRate() {
+        topicPolicies.getDispatchRate().updateBrokerValue(
+            dispatchRateInBroker(brokerService.pulsar().getConfiguration()));
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2b50365..a8bcb38 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2237,6 +2237,9 @@ public class BrokerService implements Closeable {
         this.pulsar().getExecutor().execute(() -> {
             // update message-rate for each topic
             forEachTopic(topic -> {
+                if (topic instanceof AbstractTopic) {
+                    ((AbstractTopic) topic).updateBrokerDispatchRate();
+                }
                 if (topic.getDispatchRateLimiter().isPresent()) {
                     topic.getDispatchRateLimiter().get().updateDispatchRate();
                 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 97a1063..a5784aef 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -170,6 +170,9 @@ public class DispatchRateLimiter {
      */
     public void updateDispatchRate() {
         switch (type) {
+            case TOPIC:
+                updateDispatchRate(topic.getDispatchRate());
+                return;
             case SUBSCRIPTION:
                 updateDispatchRate(topic.getSubscriptionDispatchRate());
                 return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a090048..6806cb7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -258,7 +258,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         this.replicators = new ConcurrentOpenHashMap<>(16, 1);
         this.backloggedCursorThresholdEntries =
                 brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
-        initializeRateLimiterIfNeeded(Optional.empty());
         registerTopicPolicyListener();
 
         this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
@@ -313,6 +312,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                         isEncryptionRequired = false;
                         updatePublishDispatcher();
                         updateResourceGroupLimiter(optPolicies);
+                        initializeRateLimiterIfNeeded(Optional.empty());
                         return;
                     }
 
@@ -320,6 +320,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
                     this.updateTopicPolicyByNamespacePolicy(policies);
 
+                    initializeRateLimiterIfNeeded(Optional.empty());
+
                     updatePublishDispatcher();
 
                     updateResourceGroupLimiter(optPolicies);
@@ -361,8 +363,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     private void initializeRateLimiterIfNeeded(Optional<Policies> policies) {
         synchronized (dispatchRateLimiter) {
             // dispatch rate limiter for topic
-            if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
-                    .isDispatchRateNeeded(brokerService, policies, topic, Type.TOPIC)) {
+            if (!dispatchRateLimiter.isPresent()
+                && DispatchRateLimiter.isDispatchRateEnabled(topicPolicies.getDispatchRate().get())) {
                 this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
             }
             boolean isDispatchRateNeeded = SubscribeRateLimiter.isDispatchRateNeeded(brokerService, policies, topic);
@@ -2426,11 +2428,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
                 CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
                 // update rate-limiter if policies updated
-                if (this.dispatchRateLimiter.isPresent()) {
-                    if (!topicPolicies.isPresent() || !topicPolicies.get().isDispatchRateSet()) {
-                        dispatchRateLimiter.get().onPoliciesUpdate(data);
-                    }
-                }
+                dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
                 if (this.subscribeRateLimiter.isPresent()) {
                     subscribeRateLimiter.get().onPoliciesUpdate(data);
                 }
@@ -3026,13 +3024,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         Optional<Policies> namespacePolicies = getNamespacePolicies();
         initializeTopicDispatchRateLimiterIfNeeded(policies);
 
-        dispatchRateLimiter.ifPresent(limiter -> {
-            if (policies.isDispatchRateSet()) {
-                dispatchRateLimiter.get().updateDispatchRate(policies.getDispatchRate());
-            } else {
-                dispatchRateLimiter.get().updateDispatchRate();
-            }
-        });
+        dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
 
         List<CompletableFuture<Void>> consumerCheckFutures = new ArrayList<>();
         subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(consumer -> {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index 8546c83..9e86070 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -54,6 +54,7 @@ public class HierarchyTopicPolicies {
     final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;
     final PolicyHierarchyValue<DispatchRateImpl> subscriptionDispatchRate;
     final PolicyHierarchyValue<SchemaCompatibilityStrategy> schemaCompatibilityStrategy;
+    final PolicyHierarchyValue<DispatchRateImpl> dispatchRate;
 
     public HierarchyTopicPolicies() {
         replicationClusters = new PolicyHierarchyValue<>();
@@ -80,5 +81,6 @@ public class HierarchyTopicPolicies {
         compactionThreshold = new PolicyHierarchyValue<>();
         subscriptionDispatchRate = new PolicyHierarchyValue<>();
         schemaCompatibilityStrategy = new PolicyHierarchyValue<>();
+        dispatchRate = new PolicyHierarchyValue<>();
     }
 }