You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/29 04:23:22 UTC

[pulsar] branch master updated: [feature][broker] add config maxUnloadBundleNumPerShedding for UniformLoadShedder (#16409)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f2060d0d28 [feature][broker] add config maxUnloadBundleNumPerShedding for UniformLoadShedder (#16409)
0f2060d0d28 is described below

commit 0f2060d0d2815ae5691b7dcee44878bfb3a9db82
Author: LinChen <15...@qq.com>
AuthorDate: Fri Jul 29 12:23:15 2022 +0800

    [feature][broker] add config maxUnloadBundleNumPerShedding for UniformLoadShedder (#16409)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  8 +++++
 .../loadbalance/impl/UniformLoadShedder.java       |  4 +++
 .../loadbalance/impl/UniformLoadShedderTest.java   | 40 ++++++++++++++++++++++
 3 files changed, 52 insertions(+)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 91925bc85e8..a6bd9dde24f 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2184,6 +2184,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private double loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold = 4;
 
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "For each uniform balanced unload, the maximum number of bundles that can be unloaded."
+                    + " The default value is -1, which means no limit"
+    )
+    private int maxUnloadBundleNumPerShedding = -1;
+
     @FieldContext(
             dynamic = true,
             category = CATEGORY_LOAD_BALANCER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
index 486f5e21a81..c31f9e14196 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
@@ -140,6 +140,10 @@ public class UniformLoadShedder implements LoadSheddingStrategy {
                             return Triple.of(bundle, bundleData, throughput);
                         }).filter(e -> !recentlyUnloadedBundles.containsKey(e.getLeft()))
                         .sorted((e1, e2) -> Double.compare(e2.getRight(), e1.getRight())).forEach((e) -> {
+                            if (conf.getMaxUnloadBundleNumPerShedding() != -1
+                                    && selectedBundlesCache.size() >= conf.getMaxUnloadBundleNumPerShedding()) {
+                                return;
+                            }
                             String bundle = e.getLeft();
                             BundleData bundleData = e.getMiddle();
                             TimeAverageMessageData shortTermData = bundleData.getShortTermData();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedderTest.java
index 7affb52c2e4..7498386c113 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedderTest.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.broker.loadbalance.LoadData;
 import org.apache.pulsar.policies.data.loadbalancer.*;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 
 @Test(groups = "broker")
@@ -41,6 +42,45 @@ public class UniformLoadShedderTest {
         uniformLoadShedder = new UniformLoadShedder();
     }
 
+    @Test
+    public void testMaxUnloadBundleNumPerShedding(){
+        conf.setMaxUnloadBundleNumPerShedding(2);
+        int numBundles = 20;
+        LoadData loadData = new LoadData();
+
+        LocalBrokerData broker1 = new LocalBrokerData();
+        LocalBrokerData broker2 = new LocalBrokerData();
+
+        String broker2Name = "broker2";
+
+        double brokerThroughput = 0;
+
+        for (int i = 1; i <= numBundles; ++i) {
+            broker1.getBundles().add("bundle-" + i);
+
+            BundleData bundle = new BundleData();
+
+            TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData();
+
+            double throughput = 1 * 1024 * 1024;
+            timeAverageMessageData.setMsgThroughputIn(throughput);
+            timeAverageMessageData.setMsgThroughputOut(throughput);
+            bundle.setShortTermData(timeAverageMessageData);
+            loadData.getBundleData().put("bundle-" + i, bundle);
+
+            brokerThroughput += throughput;
+        }
+
+        broker1.setMsgThroughputIn(brokerThroughput);
+        broker1.setMsgThroughputOut(brokerThroughput);
+
+        loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+        loadData.getBrokerData().put(broker2Name, new BrokerData(broker2));
+
+        Multimap<String, String> bundlesToUnload = uniformLoadShedder.findBundlesForUnloading(loadData, conf);
+        assertEquals(bundlesToUnload.size(),2);
+    }
+
     @Test
     public void testBrokerWithMultipleBundles() {
         int numBundles = 10;