You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/29 04:23:22 UTC
[pulsar] branch master updated: [feature][broker] add config maxUnloadBundleNumPerShedding for UniformLoadShedder (#16409)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0f2060d0d28 [feature][broker] add config maxUnloadBundleNumPerShedding for UniformLoadShedder (#16409)
0f2060d0d28 is described below
commit 0f2060d0d2815ae5691b7dcee44878bfb3a9db82
Author: LinChen <15...@qq.com>
AuthorDate: Fri Jul 29 12:23:15 2022 +0800
[feature][broker] add config maxUnloadBundleNumPerShedding for UniformLoadShedder (#16409)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 8 +++++
.../loadbalance/impl/UniformLoadShedder.java | 4 +++
.../loadbalance/impl/UniformLoadShedderTest.java | 40 ++++++++++++++++++++++
3 files changed, 52 insertions(+)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 91925bc85e8..a6bd9dde24f 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2184,6 +2184,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private double loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold = 4;
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "For each uniform balanced unload, the maximum number of bundles that can be unloaded."
+ + " The default value is -1, which means no limit"
+ )
+ private int maxUnloadBundleNumPerShedding = -1;
+
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
index 486f5e21a81..c31f9e14196 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
@@ -140,6 +140,10 @@ public class UniformLoadShedder implements LoadSheddingStrategy {
return Triple.of(bundle, bundleData, throughput);
}).filter(e -> !recentlyUnloadedBundles.containsKey(e.getLeft()))
.sorted((e1, e2) -> Double.compare(e2.getRight(), e1.getRight())).forEach((e) -> {
+ if (conf.getMaxUnloadBundleNumPerShedding() != -1
+ && selectedBundlesCache.size() >= conf.getMaxUnloadBundleNumPerShedding()) {
+ return;
+ }
String bundle = e.getLeft();
BundleData bundleData = e.getMiddle();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedderTest.java
index 7affb52c2e4..7498386c113 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedderTest.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.policies.data.loadbalancer.*;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@Test(groups = "broker")
@@ -41,6 +42,45 @@ public class UniformLoadShedderTest {
uniformLoadShedder = new UniformLoadShedder();
}
+ @Test
+ public void testMaxUnloadBundleNumPerShedding(){
+ conf.setMaxUnloadBundleNumPerShedding(2);
+ int numBundles = 20;
+ LoadData loadData = new LoadData();
+
+ LocalBrokerData broker1 = new LocalBrokerData();
+ LocalBrokerData broker2 = new LocalBrokerData();
+
+ String broker2Name = "broker2";
+
+ double brokerThroughput = 0;
+
+ for (int i = 1; i <= numBundles; ++i) {
+ broker1.getBundles().add("bundle-" + i);
+
+ BundleData bundle = new BundleData();
+
+ TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData();
+
+ double throughput = 1 * 1024 * 1024;
+ timeAverageMessageData.setMsgThroughputIn(throughput);
+ timeAverageMessageData.setMsgThroughputOut(throughput);
+ bundle.setShortTermData(timeAverageMessageData);
+ loadData.getBundleData().put("bundle-" + i, bundle);
+
+ brokerThroughput += throughput;
+ }
+
+ broker1.setMsgThroughputIn(brokerThroughput);
+ broker1.setMsgThroughputOut(brokerThroughput);
+
+ loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
+ loadData.getBrokerData().put(broker2Name, new BrokerData(broker2));
+
+ Multimap<String, String> bundlesToUnload = uniformLoadShedder.findBundlesForUnloading(loadData, conf);
+ assertEquals(bundlesToUnload.size(),2);
+ }
+
@Test
public void testBrokerWithMultipleBundles() {
int numBundles = 10;