You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/06/14 06:58:34 UTC

[pulsar] branch branch-2.10 updated: Optimize topic policy with HierarchyTopicPolicies about replicatorDispatchRate (#14161)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new c11202c827f Optimize topic policy with HierarchyTopicPolicies about replicatorDispatchRate (#14161)
c11202c827f is described below

commit c11202c827ff5ab22ed9cb58e133ef1ee9830803
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Mon Feb 28 09:46:06 2022 +0800

    Optimize topic policy with HierarchyTopicPolicies about replicatorDispatchRate (#14161)
    
    (cherry picked from commit 2b3e8aeb5a1c259e0325e5a91dc5d7e20c6ee569)
---
 .../pulsar/broker/service/AbstractTopic.java       | 30 ++++++++++++++++++++--
 .../pulsar/broker/service/BrokerService.java       | 12 +++++----
 .../apache/pulsar/broker/service/Replicator.java   |  3 +--
 .../service/persistent/DispatchRateLimiter.java    |  3 +++
 .../service/persistent/PersistentReplicator.java   |  9 +++----
 .../broker/service/persistent/PersistentTopic.java |  2 +-
 .../pulsar/broker/service/ServerCnxTest.java       | 15 +++++++++++
 .../policies/data/HierarchyTopicPolicies.java      |  2 ++
 8 files changed, 61 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 4ed89908651..286eccb8bd6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -161,6 +161,10 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         return this.topicPolicies.getSchemaCompatibilityStrategy().get();
     }
 
+    public DispatchRateImpl getReplicatorDispatchRate() {
+        return this.topicPolicies.getReplicatorDispatchRate().get();
+    }
+
     private SchemaCompatibilityStrategy formatSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
         return strategy == SchemaCompatibilityStrategy.UNDEFINED ? null : strategy;
     }
@@ -194,6 +198,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
         topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()));
         topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
+        topicPolicies.getReplicatorDispatchRate().updateTopicValue(normalize(data.getReplicatorDispatchRate()));
         topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
         topicPolicies.getSubscriptionDispatchRate().updateTopicValue(normalize(data.getSubscriptionDispatchRate()));
         topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
@@ -233,6 +238,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
                         .map(DelayedDeliveryPolicies::getTickTime).orElse(null));
         topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(
                 subTypeStringsToEnumSet(namespacePolicies.subscription_types_enabled));
+        updateNamespaceReplicatorDispatchRate(namespacePolicies,
+            brokerService.getPulsar().getConfig().getClusterName());
         Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
                 type -> this.topicPolicies.getBackLogQuotaMap().get(type)
                         .updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, type)));
@@ -246,6 +253,11 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
             .updateNamespaceValue(normalize(namespacePolicies.subscriptionDispatchRate.get(cluster)));
     }
 
+    private void updateNamespaceReplicatorDispatchRate(Policies namespacePolicies, String cluster) {
+        topicPolicies.getReplicatorDispatchRate()
+            .updateNamespaceValue(normalize(namespacePolicies.replicatorDispatchRate.get(cluster)));
+    }
+
     private DispatchRateImpl normalize(DispatchRateImpl dispatchRate) {
         if (dispatchRate != null
             && (dispatchRate.getDispatchThrottlingRateInMsg() > 0
@@ -315,6 +327,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         topicPolicies.getCompactionThreshold().updateBrokerValue(config.getBrokerServiceCompactionThresholdInBytes());
         topicPolicies.getReplicationClusters().updateBrokerValue(Collections.emptyList());
         SchemaCompatibilityStrategy schemaCompatibilityStrategy = config.getSchemaCompatibilityStrategy();
+        topicPolicies.getReplicatorDispatchRate().updateBrokerValue(replicatorDispatchRateInBroker(config));
         if (isSystemTopic()) {
             schemaCompatibilityStrategy = config.getSystemTopicSchemaCompatibilityStrategy();
         }
@@ -331,6 +344,14 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
             .build();
     }
 
+    private DispatchRateImpl replicatorDispatchRateInBroker(ServiceConfiguration config) {
+        return DispatchRateImpl.builder()
+            .dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerReplicatorInMsg())
+            .dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerReplicatorInByte())
+            .ratePeriodInSecond(1)
+            .build();
+    }
+
     private EnumSet<SubType> subTypeStringsToEnumSet(Set<String> getSubscriptionTypesEnabled) {
         EnumSet<SubType> subTypes = EnumSet.noneOf(SubType.class);
         for (String subTypeStr : CollectionUtils.emptyIfNull(getSubscriptionTypesEnabled)) {
@@ -1123,8 +1144,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
     }
 
     public void updateBrokerSubscriptionDispatchRate() {
-        topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(
-            subscriptionDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
+                topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(
+                    subscriptionDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
     }
 
     public void addFilteredEntriesCount(int filtered) {
@@ -1134,4 +1155,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
     public long getFilteredEntriesCount() {
         return this.filteredEntriesCounter.longValue();
     }
+
+    public void updateBrokerReplicatorDispatchRate() {
+        topicPolicies.getReplicatorDispatchRate().updateBrokerValue(
+            replicatorDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e05bd7bd9c2..cda3d90fd98 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2313,12 +2313,14 @@ public class BrokerService implements Closeable {
     private void updateReplicatorMessageDispatchRate() {
         this.pulsar().getExecutor().submit(() -> {
             // update message-rate for each topic Replicator in Geo-replication
-            forEachTopic(topic ->
-                topic.getReplicators().forEach((name, persistentReplicator) -> {
-                    if (persistentReplicator.getRateLimiter().isPresent()) {
-                        persistentReplicator.getRateLimiter().get().updateDispatchRate();
+            forEachTopic(topic -> {
+                    if (topic instanceof AbstractTopic) {
+                        ((AbstractTopic) topic).updateBrokerReplicatorDispatchRate();
                     }
-                }));
+                    topic.getReplicators().forEach((name, persistentReplicator) ->
+                        persistentReplicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate));
+                }
+            );
         });
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
index 5f738ac1937..2cd6ec62327 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.service;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
-import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
 
 public interface Replicator {
@@ -38,7 +37,7 @@ public interface Replicator {
 
     String getRemoteCluster();
 
-    default void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
+    default void initializeDispatchRateLimiterIfNeeded() {
         //No-op
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 97a10631067..3247494577a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -173,6 +173,9 @@ public class DispatchRateLimiter {
             case SUBSCRIPTION:
                 updateDispatchRate(topic.getSubscriptionDispatchRate());
                 return;
+            case REPLICATOR:
+                updateDispatchRate(topic.getReplicatorDispatchRate());
+                return;
         }
 
         Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 1f208bc768a..cc5410dbbeb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -57,7 +57,6 @@ import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.SendCallback;
 import org.apache.pulsar.common.api.proto.MarkerType;
-import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.stats.Rate;
@@ -123,7 +122,7 @@ public class PersistentReplicator extends AbstractReplicator
         readMaxSizeBytes = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadSizeBytes();
         producerQueueThreshold = (int) (producerQueueSize * 0.9);
 
-        this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
+        this.initializeDispatchRateLimiterIfNeeded();
 
         startProducer();
     }
@@ -705,9 +704,9 @@ public class PersistentReplicator extends AbstractReplicator
     }
 
     @Override
-    public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
-        if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
-            .isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), Type.REPLICATOR)) {
+    public void initializeDispatchRateLimiterIfNeeded() {
+        if (!dispatchRateLimiter.isPresent()
+            && DispatchRateLimiter.isDispatchRateEnabled(topic.getReplicatorDispatchRate())) {
             this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.REPLICATOR));
         }
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 629158c4d50..27bb77fbc5d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -395,7 +395,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
             // dispatch rate limiter for each replicator
             replicators.forEach((name, replicator) ->
-                replicator.initializeDispatchRateLimiterIfNeeded(policies));
+                replicator.initializeDispatchRateLimiterIfNeeded());
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index afa5d1aad03..a7fbd070f1d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1397,6 +1397,9 @@ public class ServerCnxTest {
         // add `clusterDispatchRate` otherwise there will be a NPE
         // `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
         policies.subscriptionDispatchRate = Maps.newHashMap();
+        // add `clusterDispatchRate` otherwise there will be a NPE
+        // `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
+        policies.replicatorDispatchRate = Maps.newHashMap();
         doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
                 .getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
 
@@ -1429,6 +1432,9 @@ public class ServerCnxTest {
         // add `clusterDispatchRate` otherwise there will be a NPE
         // `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
         policies.subscriptionDispatchRate = Maps.newHashMap();
+        // add `clusterDispatchRate` otherwise there will be a NPE
+        // `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
+        policies.replicatorDispatchRate = Maps.newHashMap();
         doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
                 .getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
 
@@ -1465,6 +1471,9 @@ public class ServerCnxTest {
         // add `clusterDispatchRate` otherwise there will be a NPE
         // `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
         policies.subscriptionDispatchRate = Maps.newHashMap();
+        // add `clusterDispatchRate` otherwise there will be a NPE
+        // `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
+        policies.replicatorDispatchRate = Maps.newHashMap();
         doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
                 .getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
 
@@ -1499,6 +1508,9 @@ public class ServerCnxTest {
         // add `clusterDispatchRate` otherwise there will be a NPE
         // `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
         policies.subscriptionDispatchRate = Maps.newHashMap();
+        // add `clusterDispatchRate` otherwise there will be a NPE
+        // `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
+        policies.replicatorDispatchRate = Maps.newHashMap();
         doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
                 .getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
 
@@ -1539,6 +1551,9 @@ public class ServerCnxTest {
         // add `clusterDispatchRate` otherwise there will be a NPE
         // `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
         policies.subscriptionDispatchRate = Maps.newHashMap();
+        // add `clusterDispatchRate` otherwise there will be a NPE
+        // `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
+        policies.replicatorDispatchRate = Maps.newHashMap();
         doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
                 .getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index 8546c83f0e4..917b9753cc3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -51,6 +51,7 @@ public class HierarchyTopicPolicies {
     final PolicyHierarchyValue<PublishRate> publishRate;
     final PolicyHierarchyValue<Boolean> delayedDeliveryEnabled;
     final PolicyHierarchyValue<Long> delayedDeliveryTickTimeMillis;
+    final PolicyHierarchyValue<DispatchRateImpl> replicatorDispatchRate;
     final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;
     final PolicyHierarchyValue<DispatchRateImpl> subscriptionDispatchRate;
     final PolicyHierarchyValue<SchemaCompatibilityStrategy> schemaCompatibilityStrategy;
@@ -77,6 +78,7 @@ public class HierarchyTopicPolicies {
         publishRate = new PolicyHierarchyValue<>();
         delayedDeliveryEnabled = new PolicyHierarchyValue<>();
         delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>();
+        replicatorDispatchRate = new PolicyHierarchyValue<>();
         compactionThreshold = new PolicyHierarchyValue<>();
         subscriptionDispatchRate = new PolicyHierarchyValue<>();
         schemaCompatibilityStrategy = new PolicyHierarchyValue<>();