You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/25 04:20:54 UTC

[pulsar] 06/12: [fix][broker] The configuration loadBalancerNamespaceMaximumBundles is invalid (#16552)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a0133e91c5bede8ebebe4f4263109a93930662e7
Author: LinChen <15...@qq.com>
AuthorDate: Mon Jul 18 22:13:06 2022 +0800

    [fix][broker] The configuration loadBalancerNamespaceMaximumBundles is invalid (#16552)
    
    (cherry picked from commit 5698b08d57f5497b355aa61ac33e7f1303f1ca8e)
---
 .../loadbalance/impl/BundleSplitterTask.java       | 12 ++++-
 .../loadbalance/impl/BundleSplitterTaskTest.java   | 52 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
index 751203ca124..2203dbafc37 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -39,12 +40,16 @@ public class BundleSplitterTask implements BundleSplitStrategy {
     private static final Logger log = LoggerFactory.getLogger(BundleSplitStrategy.class);
     private final Set<String> bundleCache;
 
+    private final Map<String, Integer> namespaceBundleCount;
+
+
     /**
      * Construct a BundleSplitterTask.
      *
      */
     public BundleSplitterTask() {
         bundleCache = new HashSet<>();
+        namespaceBundleCount = new HashMap<>();
     }
 
     /**
@@ -61,12 +66,14 @@ public class BundleSplitterTask implements BundleSplitStrategy {
     @Override
     public Set<String> findBundlesToSplit(final LoadData loadData, final PulsarService pulsar) {
         bundleCache.clear();
+        namespaceBundleCount.clear();
         final ServiceConfiguration conf = pulsar.getConfiguration();
         int maxBundleCount = conf.getLoadBalancerNamespaceMaximumBundles();
         long maxBundleTopics = conf.getLoadBalancerNamespaceBundleMaxTopics();
         long maxBundleSessions = conf.getLoadBalancerNamespaceBundleMaxSessions();
         long maxBundleMsgRate = conf.getLoadBalancerNamespaceBundleMaxMsgRate();
         long maxBundleBandwidth = conf.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * LoadManagerShared.MIBI;
+
         loadData.getBrokerData().forEach((broker, brokerData) -> {
             LocalBrokerData localData = brokerData.getLocalData();
             for (final Map.Entry<String, NamespaceBundleStats> entry : localData.getLastStats().entrySet()) {
@@ -91,8 +98,11 @@ public class BundleSplitterTask implements BundleSplitStrategy {
                     try {
                         final int bundleCount = pulsar.getNamespaceService()
                                 .getBundleCount(NamespaceName.get(namespace));
-                        if (bundleCount < maxBundleCount) {
+                        if ((bundleCount + namespaceBundleCount.getOrDefault(namespace, 0))
+                                < maxBundleCount) {
                             bundleCache.add(bundle);
+                            int bundleNum = namespaceBundleCount.getOrDefault(namespace, 0);
+                            namespaceBundleCount.put(namespace, bundleNum + 1);
                         } else {
                             log.warn(
                                     "Could not split namespace bundle {} because namespace {} has too many bundles: {}",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
index 7480989bbb5..9ff266ba96c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.TimeAverageMessageData;
 import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -94,6 +95,57 @@ public class BundleSplitterTaskTest {
         Assert.assertEquals(bundlesToSplit.size(), 0);
     }
 
+    @Test
+    public void testLoadBalancerNamespaceMaximumBundles() throws Exception {
+        pulsar.getConfiguration().setLoadBalancerNamespaceMaximumBundles(3);
+
+        final BundleSplitterTask bundleSplitterTask = new BundleSplitterTask();
+        LoadData loadData = new LoadData();
+
+        LocalBrokerData brokerData = new LocalBrokerData();
+        Map<String, NamespaceBundleStats> lastStats = new HashMap<>();
+        final NamespaceBundleStats namespaceBundleStats = new NamespaceBundleStats();
+        namespaceBundleStats.topics = 5;
+        lastStats.put("ten/ns/0x00000000_0x20000000", namespaceBundleStats);
+
+        final NamespaceBundleStats namespaceBundleStats2 = new NamespaceBundleStats();
+        namespaceBundleStats2.topics = 5;
+        lastStats.put("ten/ns/0x20000000_0x40000000", namespaceBundleStats2);
+
+        final NamespaceBundleStats namespaceBundleStats3 = new NamespaceBundleStats();
+        namespaceBundleStats3.topics = 5;
+        lastStats.put("ten/ns/0x40000000_0x60000000", namespaceBundleStats3);
+
+        brokerData.setLastStats(lastStats);
+        loadData.getBrokerData().put("broker", new BrokerData(brokerData));
+
+        BundleData bundleData1 = new BundleData();
+        TimeAverageMessageData averageMessageData1 = new TimeAverageMessageData();
+        averageMessageData1.setMsgRateIn(pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate() * 2);
+        averageMessageData1.setMsgRateOut(1);
+        bundleData1.setLongTermData(averageMessageData1);
+        loadData.getBundleData().put("ten/ns/0x00000000_0x20000000", bundleData1);
+
+        BundleData bundleData2 = new BundleData();
+        TimeAverageMessageData averageMessageData2 = new TimeAverageMessageData();
+        averageMessageData2.setMsgRateIn(pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate() * 2);
+        averageMessageData2.setMsgRateOut(1);
+        bundleData2.setLongTermData(averageMessageData2);
+        loadData.getBundleData().put("ten/ns/0x20000000_0x40000000", bundleData2);
+
+        BundleData bundleData3 = new BundleData();
+        TimeAverageMessageData averageMessageData3 = new TimeAverageMessageData();
+        averageMessageData3.setMsgRateIn(pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate() * 2);
+        averageMessageData3.setMsgRateOut(1);
+        bundleData3.setLongTermData(averageMessageData3);
+        loadData.getBundleData().put("ten/ns/0x40000000_0x60000000", bundleData3);
+
+        int currentBundleCount = pulsar.getNamespaceService().getBundleCount(NamespaceName.get("ten/ns"));
+        final Set<String> bundlesToSplit = bundleSplitterTask.findBundlesToSplit(loadData, pulsar);
+        Assert.assertEquals(bundlesToSplit.size() + currentBundleCount,
+                pulsar.getConfiguration().getLoadBalancerNamespaceMaximumBundles());
+    }
+
 
     @AfterMethod(alwaysRun = true)
     void shutdown() throws Exception {