You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/29 03:35:21 UTC

[pulsar] branch master updated: [improve][broker] Add dynamic configuration to UniformLoadShedder (#16391)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d64e2e6668 [improve][broker] Add dynamic configuration to UniformLoadShedder (#16391)
4d64e2e6668 is described below

commit 4d64e2e66689381ebbb94fbfc03eb4e1dfba0405
Author: LinChen <15...@qq.com>
AuthorDate: Fri Jul 29 11:35:15 2022 +0800

    [improve][broker] Add dynamic configuration to UniformLoadShedder (#16391)
---
 .../apache/pulsar/broker/ServiceConfiguration.java  | 21 +++++++++++++++++++++
 .../broker/loadbalance/impl/UniformLoadShedder.java | 14 +++++---------
 2 files changed, 26 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 178e047a32d..91925bc85e8 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2144,6 +2144,27 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private int loadBalancerAverageResourceUsageDifferenceThresholdPercentage = 10;
 
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "In the UniformLoadShedder strategy, the minimum message that triggers unload."
+    )
+    private int minUnloadMessage = 1000;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "In the UniformLoadShedder strategy, the minimum throughput that triggers unload."
+    )
+    private int minUnloadMessageThroughput = 1 * 1024 * 1024;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "In the UniformLoadShedder strategy, the maximum unload ratio."
+    )
+    private double maxUnloadPercentage = 0.2;
+
     @FieldContext(
         dynamic = true,
         category = CATEGORY_LOAD_BALANCER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
index 3d400b07cae..486f5e21a81 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
@@ -44,11 +44,6 @@ import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
  */
 @Slf4j
 public class UniformLoadShedder implements LoadSheddingStrategy {
-
-    private static final int MB = 1024 * 1024;
-    private static final double MAX_UNLOAD_PERCENTAGE = 0.2;
-    private static final int MIN_UNLOAD_MESSAGE = 1000;
-    private static final int MIN_UNLOAD_THROUGHPUT = 1 * MB;
     private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
     private static final double EPS = 1e-6;
 
@@ -122,14 +117,15 @@ public class UniformLoadShedder implements LoadSheddingStrategy {
                         underloadedBroker.getValue(), minMsgRate.getValue(), minThroughputRate.getValue());
             }
             MutableInt msgRateRequiredFromUnloadedBundles = new MutableInt(
-                    (int) ((maxMsgRate.getValue() - minMsgRate.getValue()) * MAX_UNLOAD_PERCENTAGE));
+                    (int) ((maxMsgRate.getValue() - minMsgRate.getValue()) * conf.getMaxUnloadPercentage()));
             MutableInt msgThroughputRequiredFromUnloadedBundles = new MutableInt(
-                    (int) ((maxThroughputRate.getValue() - minThroughputRate.getValue()) * MAX_UNLOAD_PERCENTAGE));
+                    (int) ((maxThroughputRate.getValue() - minThroughputRate.getValue())
+                            * conf.getMaxUnloadPercentage()));
             LocalBrokerData overloadedBrokerData = brokersData.get(overloadedBroker.getValue()).getLocalData();
 
             if (overloadedBrokerData.getBundles().size() > 1
-                && (msgRateRequiredFromUnloadedBundles.getValue() >= MIN_UNLOAD_MESSAGE
-                    || msgThroughputRequiredFromUnloadedBundles.getValue() >= MIN_UNLOAD_THROUGHPUT)) {
+                && (msgRateRequiredFromUnloadedBundles.getValue() >= conf.getMinUnloadMessage()
+                    || msgThroughputRequiredFromUnloadedBundles.getValue() >= conf.getMinUnloadMessageThroughput())) {
                 // Sort bundles by throughput, then pick the bundle which can help to reduce load uniformly with
                 // under-loaded broker
                 loadBundleData.entrySet().stream()