You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2023/02/16 08:03:56 UTC

[pulsar] branch branch-2.10 updated: [fix][broker] Fix loadbalance score caculation problem (#19420)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new b4b664c5327 [fix][broker] Fix loadbalance score caculation problem (#19420)
b4b664c5327 is described below

commit b4b664c5327f5e4310ea35a36bc6caa34e96ce94
Author: gaozhangmin <zh...@apache.org>
AuthorDate: Wed Feb 15 17:53:37 2023 +0800

    [fix][broker] Fix loadbalance score caculation problem (#19420)
    
    (cherry picked from commit 456d1122525bb5a0b794a6f6f15313e40b886047)
---
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 29 ++++++++++------------
 1 file changed, 13 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index c48b12b85c2..10b6cb86534 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.pulsar.broker.BrokerData;
@@ -64,6 +65,7 @@ import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLo
 import org.apache.pulsar.broker.resources.ClusterResources;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -545,24 +547,19 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
 
             // Remove all loaded bundles from the preallocated maps.
             final Map<String, BundleData> preallocatedBundleData = brokerData.getPreallocatedBundleData();
+            Set<String> ownedNsBundles = pulsar.getNamespaceService().getOwnedServiceUnits()
+                    .stream().map(NamespaceBundle::toString).collect(Collectors.toSet());
             synchronized (preallocatedBundleData) {
-                for (String preallocatedBundleName : brokerData.getPreallocatedBundleData().keySet()) {
-                    if (brokerData.getLocalData().getBundles().contains(preallocatedBundleName)) {
-                        final Iterator<Map.Entry<String, BundleData>> preallocatedIterator =
-                                preallocatedBundleData.entrySet()
-                                        .iterator();
-                        while (preallocatedIterator.hasNext()) {
-                            final String bundle = preallocatedIterator.next().getKey();
-
-                            if (bundleData.containsKey(bundle)) {
-                                preallocatedIterator.remove();
-                                preallocatedBundleToBroker.remove(bundle);
-                            }
-                        }
+                preallocatedBundleToBroker.keySet().removeAll(preallocatedBundleData.keySet());
+                final Iterator<Map.Entry<String, BundleData>> preallocatedIterator =
+                        preallocatedBundleData.entrySet().iterator();
+                while (preallocatedIterator.hasNext()) {
+                    final String bundle = preallocatedIterator.next().getKey();
+                    if (!ownedNsBundles.contains(bundle)
+                            || (brokerData.getLocalData().getBundles().contains(bundle)
+                            && bundleData.containsKey(bundle))) {
+                        preallocatedIterator.remove();
                     }
-
-                    // This is needed too in case a broker which was assigned a bundle dies and comes back up.
-                    preallocatedBundleToBroker.remove(preallocatedBundleName);
                 }
             }