You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/06/14 06:58:34 UTC
[pulsar] branch branch-2.10 updated: Optimize topic policy with HierarchyTopicPolicies about replicatorDispatchRate (#14161)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new c11202c827f Optimize topic policy with HierarchyTopicPolicies about replicatorDispatchRate (#14161)
c11202c827f is described below
commit c11202c827ff5ab22ed9cb58e133ef1ee9830803
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Mon Feb 28 09:46:06 2022 +0800
Optimize topic policy with HierarchyTopicPolicies about replicatorDispatchRate (#14161)
(cherry picked from commit 2b3e8aeb5a1c259e0325e5a91dc5d7e20c6ee569)
---
.../pulsar/broker/service/AbstractTopic.java | 30 ++++++++++++++++++++--
.../pulsar/broker/service/BrokerService.java | 12 +++++----
.../apache/pulsar/broker/service/Replicator.java | 3 +--
.../service/persistent/DispatchRateLimiter.java | 3 +++
.../service/persistent/PersistentReplicator.java | 9 +++----
.../broker/service/persistent/PersistentTopic.java | 2 +-
.../pulsar/broker/service/ServerCnxTest.java | 15 +++++++++++
.../policies/data/HierarchyTopicPolicies.java | 2 ++
8 files changed, 61 insertions(+), 15 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 4ed89908651..286eccb8bd6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -161,6 +161,10 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
return this.topicPolicies.getSchemaCompatibilityStrategy().get();
}
+ public DispatchRateImpl getReplicatorDispatchRate() {
+ return this.topicPolicies.getReplicatorDispatchRate().get();
+ }
+
private SchemaCompatibilityStrategy formatSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
return strategy == SchemaCompatibilityStrategy.UNDEFINED ? null : strategy;
}
@@ -194,6 +198,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()));
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
+ topicPolicies.getReplicatorDispatchRate().updateTopicValue(normalize(data.getReplicatorDispatchRate()));
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
topicPolicies.getSubscriptionDispatchRate().updateTopicValue(normalize(data.getSubscriptionDispatchRate()));
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
@@ -233,6 +238,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
.map(DelayedDeliveryPolicies::getTickTime).orElse(null));
topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(
subTypeStringsToEnumSet(namespacePolicies.subscription_types_enabled));
+ updateNamespaceReplicatorDispatchRate(namespacePolicies,
+ brokerService.getPulsar().getConfig().getClusterName());
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
type -> this.topicPolicies.getBackLogQuotaMap().get(type)
.updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, type)));
@@ -246,6 +253,11 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
.updateNamespaceValue(normalize(namespacePolicies.subscriptionDispatchRate.get(cluster)));
}
+ private void updateNamespaceReplicatorDispatchRate(Policies namespacePolicies, String cluster) {
+ topicPolicies.getReplicatorDispatchRate()
+ .updateNamespaceValue(normalize(namespacePolicies.replicatorDispatchRate.get(cluster)));
+ }
+
private DispatchRateImpl normalize(DispatchRateImpl dispatchRate) {
if (dispatchRate != null
&& (dispatchRate.getDispatchThrottlingRateInMsg() > 0
@@ -315,6 +327,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
topicPolicies.getCompactionThreshold().updateBrokerValue(config.getBrokerServiceCompactionThresholdInBytes());
topicPolicies.getReplicationClusters().updateBrokerValue(Collections.emptyList());
SchemaCompatibilityStrategy schemaCompatibilityStrategy = config.getSchemaCompatibilityStrategy();
+ topicPolicies.getReplicatorDispatchRate().updateBrokerValue(replicatorDispatchRateInBroker(config));
if (isSystemTopic()) {
schemaCompatibilityStrategy = config.getSystemTopicSchemaCompatibilityStrategy();
}
@@ -331,6 +344,14 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
.build();
}
+ private DispatchRateImpl replicatorDispatchRateInBroker(ServiceConfiguration config) {
+ return DispatchRateImpl.builder()
+ .dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerReplicatorInMsg())
+ .dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerReplicatorInByte())
+ .ratePeriodInSecond(1)
+ .build();
+ }
+
private EnumSet<SubType> subTypeStringsToEnumSet(Set<String> getSubscriptionTypesEnabled) {
EnumSet<SubType> subTypes = EnumSet.noneOf(SubType.class);
for (String subTypeStr : CollectionUtils.emptyIfNull(getSubscriptionTypesEnabled)) {
@@ -1123,8 +1144,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
}
public void updateBrokerSubscriptionDispatchRate() {
- topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(
- subscriptionDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
+ topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(
+ subscriptionDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
}
public void addFilteredEntriesCount(int filtered) {
@@ -1134,4 +1155,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
public long getFilteredEntriesCount() {
return this.filteredEntriesCounter.longValue();
}
+
+ public void updateBrokerReplicatorDispatchRate() {
+ topicPolicies.getReplicatorDispatchRate().updateBrokerValue(
+ replicatorDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e05bd7bd9c2..cda3d90fd98 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2313,12 +2313,14 @@ public class BrokerService implements Closeable {
private void updateReplicatorMessageDispatchRate() {
this.pulsar().getExecutor().submit(() -> {
// update message-rate for each topic Replicator in Geo-replication
- forEachTopic(topic ->
- topic.getReplicators().forEach((name, persistentReplicator) -> {
- if (persistentReplicator.getRateLimiter().isPresent()) {
- persistentReplicator.getRateLimiter().get().updateDispatchRate();
+ forEachTopic(topic -> {
+ if (topic instanceof AbstractTopic) {
+ ((AbstractTopic) topic).updateBrokerReplicatorDispatchRate();
}
- }));
+ topic.getReplicators().forEach((name, persistentReplicator) ->
+ persistentReplicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate));
+ }
+ );
});
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
index 5f738ac1937..2cd6ec62327 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.service;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
-import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
public interface Replicator {
@@ -38,7 +37,7 @@ public interface Replicator {
String getRemoteCluster();
- default void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
+ default void initializeDispatchRateLimiterIfNeeded() {
//No-op
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 97a10631067..3247494577a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -173,6 +173,9 @@ public class DispatchRateLimiter {
case SUBSCRIPTION:
updateDispatchRate(topic.getSubscriptionDispatchRate());
return;
+ case REPLICATOR:
+ updateDispatchRate(topic.getReplicatorDispatchRate());
+ return;
}
Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 1f208bc768a..cc5410dbbeb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -57,7 +57,6 @@ import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.common.api.proto.MarkerType;
-import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.stats.Rate;
@@ -123,7 +122,7 @@ public class PersistentReplicator extends AbstractReplicator
readMaxSizeBytes = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadSizeBytes();
producerQueueThreshold = (int) (producerQueueSize * 0.9);
- this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
+ this.initializeDispatchRateLimiterIfNeeded();
startProducer();
}
@@ -705,9 +704,9 @@ public class PersistentReplicator extends AbstractReplicator
}
@Override
- public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
- if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
- .isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), Type.REPLICATOR)) {
+ public void initializeDispatchRateLimiterIfNeeded() {
+ if (!dispatchRateLimiter.isPresent()
+ && DispatchRateLimiter.isDispatchRateEnabled(topic.getReplicatorDispatchRate())) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.REPLICATOR));
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 629158c4d50..27bb77fbc5d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -395,7 +395,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
// dispatch rate limiter for each replicator
replicators.forEach((name, replicator) ->
- replicator.initializeDispatchRateLimiterIfNeeded(policies));
+ replicator.initializeDispatchRateLimiterIfNeeded());
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index afa5d1aad03..a7fbd070f1d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1397,6 +1397,9 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
policies.subscriptionDispatchRate = Maps.newHashMap();
+ // add `clusterDispatchRate` otherwise there will be a NPE
+ // `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
+ policies.replicatorDispatchRate = Maps.newHashMap();
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
@@ -1429,6 +1432,9 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
policies.subscriptionDispatchRate = Maps.newHashMap();
+ // add `clusterDispatchRate` otherwise there will be a NPE
+ // `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
+ policies.replicatorDispatchRate = Maps.newHashMap();
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
@@ -1465,6 +1471,9 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
policies.subscriptionDispatchRate = Maps.newHashMap();
+ // add `clusterDispatchRate` otherwise there will be a NPE
+ // `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
+ policies.replicatorDispatchRate = Maps.newHashMap();
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
@@ -1499,6 +1508,9 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
policies.subscriptionDispatchRate = Maps.newHashMap();
+ // add `clusterDispatchRate` otherwise there will be a NPE
+ // `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
+ policies.replicatorDispatchRate = Maps.newHashMap();
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
@@ -1539,6 +1551,9 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
policies.subscriptionDispatchRate = Maps.newHashMap();
+ // add `clusterDispatchRate` otherwise there will be a NPE
+ // `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
+ policies.replicatorDispatchRate = Maps.newHashMap();
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index 8546c83f0e4..917b9753cc3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -51,6 +51,7 @@ public class HierarchyTopicPolicies {
final PolicyHierarchyValue<PublishRate> publishRate;
final PolicyHierarchyValue<Boolean> delayedDeliveryEnabled;
final PolicyHierarchyValue<Long> delayedDeliveryTickTimeMillis;
+ final PolicyHierarchyValue<DispatchRateImpl> replicatorDispatchRate;
final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;
final PolicyHierarchyValue<DispatchRateImpl> subscriptionDispatchRate;
final PolicyHierarchyValue<SchemaCompatibilityStrategy> schemaCompatibilityStrategy;
@@ -77,6 +78,7 @@ public class HierarchyTopicPolicies {
publishRate = new PolicyHierarchyValue<>();
delayedDeliveryEnabled = new PolicyHierarchyValue<>();
delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>();
+ replicatorDispatchRate = new PolicyHierarchyValue<>();
compactionThreshold = new PolicyHierarchyValue<>();
subscriptionDispatchRate = new PolicyHierarchyValue<>();
schemaCompatibilityStrategy = new PolicyHierarchyValue<>();