You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/29 03:35:21 UTC
[pulsar] branch master updated: [improve][broker] Add dynamic configuration to UniformLoadShedder (#16391)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4d64e2e6668 [improve][broker] Add dynamic configuration to UniformLoadShedder (#16391)
4d64e2e6668 is described below
commit 4d64e2e66689381ebbb94fbfc03eb4e1dfba0405
Author: LinChen <15...@qq.com>
AuthorDate: Fri Jul 29 11:35:15 2022 +0800
[improve][broker] Add dynamic configuration to UniformLoadShedder (#16391)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 21 +++++++++++++++++++++
.../broker/loadbalance/impl/UniformLoadShedder.java | 14 +++++---------
2 files changed, 26 insertions(+), 9 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 178e047a32d..91925bc85e8 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2144,6 +2144,27 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int loadBalancerAverageResourceUsageDifferenceThresholdPercentage = 10;
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "In the UniformLoadShedder strategy, the minimum message that triggers unload."
+ )
+ private int minUnloadMessage = 1000;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "In the UniformLoadShedder strategy, the minimum throughput that triggers unload."
+ )
+ private int minUnloadMessageThroughput = 1 * 1024 * 1024;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "In the UniformLoadShedder strategy, the maximum unload ratio."
+ )
+ private double maxUnloadPercentage = 0.2;
+
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
index 3d400b07cae..486f5e21a81 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
@@ -44,11 +44,6 @@ import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
*/
@Slf4j
public class UniformLoadShedder implements LoadSheddingStrategy {
-
- private static final int MB = 1024 * 1024;
- private static final double MAX_UNLOAD_PERCENTAGE = 0.2;
- private static final int MIN_UNLOAD_MESSAGE = 1000;
- private static final int MIN_UNLOAD_THROUGHPUT = 1 * MB;
private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
private static final double EPS = 1e-6;
@@ -122,14 +117,15 @@ public class UniformLoadShedder implements LoadSheddingStrategy {
underloadedBroker.getValue(), minMsgRate.getValue(), minThroughputRate.getValue());
}
MutableInt msgRateRequiredFromUnloadedBundles = new MutableInt(
- (int) ((maxMsgRate.getValue() - minMsgRate.getValue()) * MAX_UNLOAD_PERCENTAGE));
+ (int) ((maxMsgRate.getValue() - minMsgRate.getValue()) * conf.getMaxUnloadPercentage()));
MutableInt msgThroughputRequiredFromUnloadedBundles = new MutableInt(
- (int) ((maxThroughputRate.getValue() - minThroughputRate.getValue()) * MAX_UNLOAD_PERCENTAGE));
+ (int) ((maxThroughputRate.getValue() - minThroughputRate.getValue())
+ * conf.getMaxUnloadPercentage()));
LocalBrokerData overloadedBrokerData = brokersData.get(overloadedBroker.getValue()).getLocalData();
if (overloadedBrokerData.getBundles().size() > 1
- && (msgRateRequiredFromUnloadedBundles.getValue() >= MIN_UNLOAD_MESSAGE
- || msgThroughputRequiredFromUnloadedBundles.getValue() >= MIN_UNLOAD_THROUGHPUT)) {
+ && (msgRateRequiredFromUnloadedBundles.getValue() >= conf.getMinUnloadMessage()
+ || msgThroughputRequiredFromUnloadedBundles.getValue() >= conf.getMinUnloadMessageThroughput())) {
// Sort bundles by throughput, then pick the bundle which can help to reduce load uniformly with
// under-loaded broker
loadBundleData.entrySet().stream()