You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2023/02/16 08:03:56 UTC
[pulsar] branch branch-2.10 updated: [fix][broker] Fix loadbalance score caculation problem (#19420)
This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new b4b664c5327 [fix][broker] Fix loadbalance score caculation problem (#19420)
b4b664c5327 is described below
commit b4b664c5327f5e4310ea35a36bc6caa34e96ce94
Author: gaozhangmin <zh...@apache.org>
AuthorDate: Wed Feb 15 17:53:37 2023 +0800
[fix][broker] Fix loadbalance score caculation problem (#19420)
(cherry picked from commit 456d1122525bb5a0b794a6f6f15313e40b886047)
---
.../loadbalance/impl/ModularLoadManagerImpl.java | 29 ++++++++++------------
1 file changed, 13 insertions(+), 16 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index c48b12b85c2..10b6cb86534 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.broker.BrokerData;
@@ -64,6 +65,7 @@ import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLo
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -545,24 +547,19 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
// Remove all loaded bundles from the preallocated maps.
final Map<String, BundleData> preallocatedBundleData = brokerData.getPreallocatedBundleData();
+ Set<String> ownedNsBundles = pulsar.getNamespaceService().getOwnedServiceUnits()
+ .stream().map(NamespaceBundle::toString).collect(Collectors.toSet());
synchronized (preallocatedBundleData) {
- for (String preallocatedBundleName : brokerData.getPreallocatedBundleData().keySet()) {
- if (brokerData.getLocalData().getBundles().contains(preallocatedBundleName)) {
- final Iterator<Map.Entry<String, BundleData>> preallocatedIterator =
- preallocatedBundleData.entrySet()
- .iterator();
- while (preallocatedIterator.hasNext()) {
- final String bundle = preallocatedIterator.next().getKey();
-
- if (bundleData.containsKey(bundle)) {
- preallocatedIterator.remove();
- preallocatedBundleToBroker.remove(bundle);
- }
- }
+ preallocatedBundleToBroker.keySet().removeAll(preallocatedBundleData.keySet());
+ final Iterator<Map.Entry<String, BundleData>> preallocatedIterator =
+ preallocatedBundleData.entrySet().iterator();
+ while (preallocatedIterator.hasNext()) {
+ final String bundle = preallocatedIterator.next().getKey();
+ if (!ownedNsBundles.contains(bundle)
+ || (brokerData.getLocalData().getBundles().contains(bundle)
+ && bundleData.containsKey(bundle))) {
+ preallocatedIterator.remove();
}
-
- // This is needed too in case a broker which was assigned a bundle dies and comes back up.
- preallocatedBundleToBroker.remove(preallocatedBundleName);
}
}