You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/25 04:20:54 UTC
[pulsar] 06/12: [fix][broker] The configuration loadBalancerNamespaceMaximumBundles is invalid (#16552)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a0133e91c5bede8ebebe4f4263109a93930662e7
Author: LinChen <15...@qq.com>
AuthorDate: Mon Jul 18 22:13:06 2022 +0800
[fix][broker] The configuration loadBalancerNamespaceMaximumBundles is invalid (#16552)
(cherry picked from commit 5698b08d57f5497b355aa61ac33e7f1303f1ca8e)
---
.../loadbalance/impl/BundleSplitterTask.java | 12 ++++-
.../loadbalance/impl/BundleSplitterTaskTest.java | 52 ++++++++++++++++++++++
2 files changed, 63 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
index 751203ca124..2203dbafc37 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -39,12 +40,16 @@ public class BundleSplitterTask implements BundleSplitStrategy {
private static final Logger log = LoggerFactory.getLogger(BundleSplitStrategy.class);
private final Set<String> bundleCache;
+ private final Map<String, Integer> namespaceBundleCount;
+
+
/**
* Construct a BundleSplitterTask.
*
*/
public BundleSplitterTask() {
bundleCache = new HashSet<>();
+ namespaceBundleCount = new HashMap<>();
}
/**
@@ -61,12 +66,14 @@ public class BundleSplitterTask implements BundleSplitStrategy {
@Override
public Set<String> findBundlesToSplit(final LoadData loadData, final PulsarService pulsar) {
bundleCache.clear();
+ namespaceBundleCount.clear();
final ServiceConfiguration conf = pulsar.getConfiguration();
int maxBundleCount = conf.getLoadBalancerNamespaceMaximumBundles();
long maxBundleTopics = conf.getLoadBalancerNamespaceBundleMaxTopics();
long maxBundleSessions = conf.getLoadBalancerNamespaceBundleMaxSessions();
long maxBundleMsgRate = conf.getLoadBalancerNamespaceBundleMaxMsgRate();
long maxBundleBandwidth = conf.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * LoadManagerShared.MIBI;
+
loadData.getBrokerData().forEach((broker, brokerData) -> {
LocalBrokerData localData = brokerData.getLocalData();
for (final Map.Entry<String, NamespaceBundleStats> entry : localData.getLastStats().entrySet()) {
@@ -91,8 +98,11 @@ public class BundleSplitterTask implements BundleSplitStrategy {
try {
final int bundleCount = pulsar.getNamespaceService()
.getBundleCount(NamespaceName.get(namespace));
- if (bundleCount < maxBundleCount) {
+ if ((bundleCount + namespaceBundleCount.getOrDefault(namespace, 0))
+ < maxBundleCount) {
bundleCache.add(bundle);
+ int bundleNum = namespaceBundleCount.getOrDefault(namespace, 0);
+ namespaceBundleCount.put(namespace, bundleNum + 1);
} else {
log.warn(
"Could not split namespace bundle {} because namespace {} has too many bundles: {}",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
index 7480989bbb5..9ff266ba96c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -94,6 +95,57 @@ public class BundleSplitterTaskTest {
Assert.assertEquals(bundlesToSplit.size(), 0);
}
+ @Test
+ public void testLoadBalancerNamespaceMaximumBundles() throws Exception {
+ pulsar.getConfiguration().setLoadBalancerNamespaceMaximumBundles(3);
+
+ final BundleSplitterTask bundleSplitterTask = new BundleSplitterTask();
+ LoadData loadData = new LoadData();
+
+ LocalBrokerData brokerData = new LocalBrokerData();
+ Map<String, NamespaceBundleStats> lastStats = new HashMap<>();
+ final NamespaceBundleStats namespaceBundleStats = new NamespaceBundleStats();
+ namespaceBundleStats.topics = 5;
+ lastStats.put("ten/ns/0x00000000_0x20000000", namespaceBundleStats);
+
+ final NamespaceBundleStats namespaceBundleStats2 = new NamespaceBundleStats();
+ namespaceBundleStats2.topics = 5;
+ lastStats.put("ten/ns/0x20000000_0x40000000", namespaceBundleStats2);
+
+ final NamespaceBundleStats namespaceBundleStats3 = new NamespaceBundleStats();
+ namespaceBundleStats3.topics = 5;
+ lastStats.put("ten/ns/0x40000000_0x60000000", namespaceBundleStats3);
+
+ brokerData.setLastStats(lastStats);
+ loadData.getBrokerData().put("broker", new BrokerData(brokerData));
+
+ BundleData bundleData1 = new BundleData();
+ TimeAverageMessageData averageMessageData1 = new TimeAverageMessageData();
+ averageMessageData1.setMsgRateIn(pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate() * 2);
+ averageMessageData1.setMsgRateOut(1);
+ bundleData1.setLongTermData(averageMessageData1);
+ loadData.getBundleData().put("ten/ns/0x00000000_0x20000000", bundleData1);
+
+ BundleData bundleData2 = new BundleData();
+ TimeAverageMessageData averageMessageData2 = new TimeAverageMessageData();
+ averageMessageData2.setMsgRateIn(pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate() * 2);
+ averageMessageData2.setMsgRateOut(1);
+ bundleData2.setLongTermData(averageMessageData2);
+ loadData.getBundleData().put("ten/ns/0x20000000_0x40000000", bundleData2);
+
+ BundleData bundleData3 = new BundleData();
+ TimeAverageMessageData averageMessageData3 = new TimeAverageMessageData();
+ averageMessageData3.setMsgRateIn(pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate() * 2);
+ averageMessageData3.setMsgRateOut(1);
+ bundleData3.setLongTermData(averageMessageData3);
+ loadData.getBundleData().put("ten/ns/0x40000000_0x60000000", bundleData3);
+
+ int currentBundleCount = pulsar.getNamespaceService().getBundleCount(NamespaceName.get("ten/ns"));
+ final Set<String> bundlesToSplit = bundleSplitterTask.findBundlesToSplit(loadData, pulsar);
+ Assert.assertEquals(bundlesToSplit.size() + currentBundleCount,
+ pulsar.getConfiguration().getLoadBalancerNamespaceMaximumBundles());
+ }
+
@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {