You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/09/15 12:31:13 UTC

[pulsar] branch master updated: Fix bug that fails to search namespace bundle due to NPE (#5191)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 412c8fd  Fix bug that fails to search namespace bundle due to NPE (#5191)
412c8fd is described below

commit 412c8fdb5ea61593017c395a7c8e291c922d6bf5
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Sun Sep 15 21:31:05 2019 +0900

    Fix bug that fails to search namespace bundle due to NPE (#5191)
    
    Fixes #5176
    
    ### Motivation
    
    As mentioned in #5176, NPE may occur in the load manager and fail to search for namespace bundles. This is because multiple threads may update a map named `brokerToNamespaceToBundleRange` at the same time.
    
    ### Modifications
    
    - Changed `brokerToNamespaceToBundleRange` to ConcurrentOpenHashMap instead of HashMap which is not thread safe.
    - Fixed `LoadManagerShared.removeMostServicingBrokersForNamespace()` logic so that NPE does not occur.
---
 .../broker/loadbalance/impl/LoadManagerShared.java | 66 ++++++++---------
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 18 +++--
 .../loadbalance/impl/SimpleLoadManagerImpl.java    | 15 ++--
 .../AntiAffinityNamespaceGroupTest.java            | 17 +++--
 .../loadbalance/impl/LoadManagerSharedTest.java    | 86 ++++++++++++++++++++++
 5 files changed, 151 insertions(+), 51 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 6a5575c..d9cab23 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.loadbalance.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.web.PulsarWebResource.path;
+import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
 
 import java.io.IOException;
 import java.net.MalformedURLException;
@@ -41,12 +42,13 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
 import org.apache.pulsar.broker.loadbalance.LoadData;
-import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.slf4j.Logger;
@@ -195,11 +197,12 @@ public class LoadManagerShared {
      * @param target
      *            Map to fill.
      */
-    public static void fillNamespaceToBundlesMap(final Set<String> bundles, final Map<String, Set<String>> target) {
+    public static void fillNamespaceToBundlesMap(final Set<String> bundles,
+            final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> target) {
         bundles.forEach(bundleName -> {
             final String namespaceName = getNamespaceNameFromBundleName(bundleName);
             final String bundleRange = getBundleRangeFromBundleName(bundleName);
-            target.computeIfAbsent(namespaceName, k -> new HashSet<>()).add(bundleRange);
+            target.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).add(bundleRange);
         });
     }
 
@@ -258,41 +261,32 @@ public class LoadManagerShared {
      *            Map from brokers to namespaces to bundle ranges.
      */
     public static void removeMostServicingBrokersForNamespace(final String assignedBundleName,
-            final Set<String> candidates, final Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange) {
+            final Set<String> candidates,
+            final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange) {
         if (candidates.isEmpty()) {
             return;
         }
+
         final String namespaceName = getNamespaceNameFromBundleName(assignedBundleName);
         int leastBundles = Integer.MAX_VALUE;
+
         for (final String broker : candidates) {
-            if (brokerToNamespaceToBundleRange.containsKey(broker)) {
-                final Set<String> bundleRanges = brokerToNamespaceToBundleRange.get(broker).get(namespaceName);
-                if (bundleRanges == null) {
-                    // Assume that when the namespace is absent, there are no bundles for this namespace assigned to
-                    // that broker.
-                    leastBundles = 0;
-                    break;
-                }
-                leastBundles = Math.min(leastBundles, bundleRanges.size());
-            } else {
-                // Assume non-present brokers have 0 bundles.
-                leastBundles = 0;
+            int bundles = (int) brokerToNamespaceToBundleRange
+                    .computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
+                    .computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).size();
+            leastBundles = Math.min(leastBundles, bundles);
+            if (leastBundles == 0) {
                 break;
             }
         }
-        if (leastBundles == 0) {
-            // By assumption, the namespace name will not be present if there are no bundles in the namespace
-            // assigned to the broker.
-            candidates.removeIf(broker -> brokerToNamespaceToBundleRange.containsKey(broker)
-                    && brokerToNamespaceToBundleRange.get(broker).containsKey(namespaceName));
-        } else {
-            final int finalLeastBundles = leastBundles;
-            // We may safely assume that each broker has at least one bundle for this namespace.
-            // Note that this case is far less likely since it implies that there are at least as many bundles for this
-            // namespace as brokers.
-            candidates.removeIf(broker -> brokerToNamespaceToBundleRange.get(broker).get(namespaceName)
-                    .size() != finalLeastBundles);
-        }
+
+        // Since `brokerToNamespaceToBundleRange` can be updated by other threads,
+        // `leastBundles` may differ from the actual value.
+
+        final int finalLeastBundles = leastBundles;
+        candidates.removeIf(
+                broker -> brokerToNamespaceToBundleRange.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
+                        .computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).size() > finalLeastBundles);
     }
 
     /**
@@ -324,7 +318,8 @@ public class LoadManagerShared {
      * @param brokerToNamespaceToBundleRange
      */
     public static void filterAntiAffinityGroupOwnedBrokers(final PulsarService pulsar, final String assignedBundleName,
-            final Set<String> candidates, final Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange,
+            final Set<String> candidates,
+            final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange,
             Map<String, String> brokerToDomainMap) {
         if (candidates.isEmpty()) {
             return;
@@ -424,8 +419,8 @@ public class LoadManagerShared {
      * @return
      */
     public static CompletableFuture<Map<String, Integer>> getAntiAffinityNamespaceOwnedBrokers(
-            final PulsarService pulsar, String namespaceName,
-            Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange) {
+            final PulsarService pulsar, final String namespaceName,
+            final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange) {
 
         CompletableFuture<Map<String, Integer>> antiAffinityNsBrokersResult = new CompletableFuture<>();
         ZooKeeperDataCache<Policies> policiesCache = pulsar.getConfigurationCache().policiesCache();
@@ -440,6 +435,10 @@ public class LoadManagerShared {
             final List<CompletableFuture<Void>> futures = Lists.newArrayList();
             brokerToNamespaceToBundleRange.forEach((broker, nsToBundleRange) -> {
                 nsToBundleRange.forEach((ns, bundleRange) -> {
+                    if (bundleRange.isEmpty()) {
+                        return;
+                    }
+
                     CompletableFuture<Void> future = new CompletableFuture<>();
                     futures.add(future);
                     policiesCache.getAsync(path(POLICIES, ns)).thenAccept(nsPolicies -> {
@@ -481,7 +480,8 @@ public class LoadManagerShared {
      * @throws Exception
      */
     public static boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle, String currentBroker,
-            final PulsarService pulsar, Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange,
+            final PulsarService pulsar,
+            final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange,
             Set<String> candidateBroekrs) throws Exception {
 
         Map<String, Integer> brokerNamespaceCount = getAntiAffinityNamespaceOwnedBrokers(pulsar, namespace,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 7f05c99..0b17479 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -69,6 +69,8 @@ import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
@@ -125,7 +127,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
 
     // Map from brokers to namespaces to the bundle ranges in that namespace assigned to that broker.
     // Used to distribute bundles within a namespace evely across brokers.
-    private final Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange;
+    private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange;
 
     // Path to the ZNode containing the LocalBrokerData json for this broker.
     private String brokerZnodePath;
@@ -189,7 +191,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
      */
     public ModularLoadManagerImpl() {
         brokerCandidateCache = new HashSet<>();
-        brokerToNamespaceToBundleRange = new HashMap<>();
+        brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
         defaultStats = new NamespaceBundleStats();
         filterPipeline = new ArrayList<>();
         loadData = new LoadData();
@@ -544,8 +546,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
 
             // Using the newest data, update the aggregated time-average data for the current broker.
             brokerData.getTimeAverageData().reset(statsMap.keySet(), bundleData, defaultStats);
-            final Map<String, Set<String>> namespaceToBundleRange = brokerToNamespaceToBundleRange
-                    .computeIfAbsent(broker, k -> new HashMap<>());
+            final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange = brokerToNamespaceToBundleRange
+                    .computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>());
             synchronized (namespaceToBundleRange) {
                 namespaceToBundleRange.clear();
                 LoadManagerShared.fillNamespaceToBundlesMap(statsMap.keySet(), namespaceToBundleRange);
@@ -769,8 +771,12 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
 
             final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
             final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
-            brokerToNamespaceToBundleRange.get(broker.get()).computeIfAbsent(namespaceName, k -> new HashSet<>())
-                    .add(bundleRange);
+            final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange = brokerToNamespaceToBundleRange
+                    .computeIfAbsent(broker.get(), k -> new ConcurrentOpenHashMap<>());
+            synchronized (namespaceToBundleRange) {
+                namespaceToBundleRange.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>())
+                        .add(bundleRange);
+            }
             return broker;
         }
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 9bb64eb..3610209 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -54,6 +54,8 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUnitRanking;
@@ -113,7 +115,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
 
     // Map from brokers to namespaces to the bundle ranges in that namespace assigned to that broker.
     // Used to distribute bundles within a namespace evely across brokers.
-    private final Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange;
+    private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange;
 
     // CPU usage per msg/sec
     private double realtimeCpuLoadFactor = 0.025;
@@ -199,7 +201,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
         bundleLossesCache = new HashSet<>();
         brokerCandidateCache = new HashSet<>();
         availableBrokersCache = new HashSet<>();
-        brokerToNamespaceToBundleRange = new HashMap<>();
+        brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
         this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
             @Override
             public boolean isEnablePersistentTopics(String brokerUrl) {
@@ -899,8 +901,9 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
                 // Add preallocated bundle range so incoming bundles from the same namespace are not assigned to the
                 // same broker.
                 brokerToNamespaceToBundleRange
-                        .computeIfAbsent(selectedRU.getResourceId().replace("http://", ""), k -> new HashMap<>())
-                        .computeIfAbsent(namespaceName, k -> new HashSet<>()).add(bundleRange);
+                        .computeIfAbsent(selectedRU.getResourceId().replace("http://", ""),
+                                k -> new ConcurrentOpenHashMap<>())
+                        .computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).add(bundleRange);
                 ranking.addPreAllocatedServiceUnit(serviceUnitId, quota);
                 resourceUnitRankings.put(selectedRU, ranking);
             }
@@ -1322,8 +1325,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
             final String broker = resourceUnit.getResourceId();
             final Set<String> loadedBundles = ranking.getLoadedBundles();
             final Set<String> preallocatedBundles = resourceUnitRankings.get(resourceUnit).getPreAllocatedBundles();
-            final Map<String, Set<String>> namespaceToBundleRange = brokerToNamespaceToBundleRange
-                    .computeIfAbsent(broker.replace("http://", ""), k -> new HashMap<>());
+            final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange = brokerToNamespaceToBundleRange
+                    .computeIfAbsent(broker.replace("http://", ""), k -> new ConcurrentOpenHashMap<>());
             namespaceToBundleRange.clear();
             LoadManagerShared.fillNamespaceToBundlesMap(loadedBundles, namespaceToBundleRange);
             LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundles, namespaceToBundleRange);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index 2c33538..4059b18 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -52,6 +52,8 @@ import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -236,7 +238,7 @@ public class AntiAffinityNamespaceGroupTest {
         brokerToDomainMap.put("brokerName-3", "domain-1");
 
         Set<String> candidate = Sets.newHashSet();
-        Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange = Maps.newHashMap();
+        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
 
         assertEquals(brokers.size(), totalBrokers);
 
@@ -322,7 +324,7 @@ public class AntiAffinityNamespaceGroupTest {
 
         Set<String> brokers = Sets.newHashSet();
         Set<String> candidate = Sets.newHashSet();
-        Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange = Maps.newHashMap();
+        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
         brokers.add("broker-0");
         brokers.add("broker-1");
         brokers.add("broker-2");
@@ -366,10 +368,13 @@ public class AntiAffinityNamespaceGroupTest {
         assertEquals(candidate.size(), 3);
     }
 
-    private void selectBrokerForNamespace(Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange,
+    private void selectBrokerForNamespace(
+            ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange,
             String broker, String namespace, String assignedBundleName) {
-        Map<String, Set<String>> nsToBundleMap = Maps.newHashMap();
-        nsToBundleMap.put(namespace, Sets.newHashSet(assignedBundleName));
+        ConcurrentOpenHashSet<String> bundleSet = new ConcurrentOpenHashSet<>();
+        bundleSet.add(assignedBundleName);
+        ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> nsToBundleMap = new ConcurrentOpenHashMap<>();
+        nsToBundleMap.put(namespace, bundleSet);
         brokerToNamespaceToBundleRange.put(broker, nsToBundleMap);
     }
 
@@ -458,7 +463,7 @@ public class AntiAffinityNamespaceGroupTest {
 
         Set<String> brokers = Sets.newHashSet();
         Set<String> candidate = Sets.newHashSet();
-        Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange = Maps.newHashMap();
+        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
         brokers.add("broker-0");
         brokers.add("broker-1");
         brokers.add("broker-2");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
new file mode 100644
index 0000000..b0df1bf
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class LoadManagerSharedTest {
+
+    @Test
+    public void testRemoveMostServicingBrokersForNamespace() throws Exception {
+        String namespace = "tenant1/ns1";
+        String assignedBundle = namespace + "/0x00000000_0x40000000";
+
+        Set<String> candidates = Sets.newHashSet();
+        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> map = new ConcurrentOpenHashMap<>();
+        LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, candidates, map);
+        Assert.assertEquals(candidates.size(), 0);
+
+        candidates = Sets.newHashSet("broker1");
+        LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, candidates, map);
+        Assert.assertEquals(candidates.size(), 1);
+        Assert.assertTrue(candidates.contains("broker1"));
+
+        candidates = Sets.newHashSet("broker1");
+        fillBrokerToNamespaceToBundleMap(map, "broker1", namespace, "0x40000000_0x80000000");
+        LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, candidates, map);
+        Assert.assertEquals(candidates.size(), 1);
+        Assert.assertTrue(candidates.contains("broker1"));
+
+        candidates = Sets.newHashSet("broker1", "broker2");
+        LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, candidates, map);
+        Assert.assertEquals(candidates.size(), 1);
+        Assert.assertTrue(candidates.contains("broker2"));
+
+        candidates = Sets.newHashSet("broker1", "broker2");
+        fillBrokerToNamespaceToBundleMap(map, "broker2", namespace, "0x80000000_0xc0000000");
+        LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, candidates, map);
+        Assert.assertEquals(candidates.size(), 2);
+        Assert.assertTrue(candidates.contains("broker1"));
+        Assert.assertTrue(candidates.contains("broker2"));
+
+        candidates = Sets.newHashSet("broker1", "broker2");
+        fillBrokerToNamespaceToBundleMap(map, "broker2", namespace, "0xc0000000_0xd0000000");
+        LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, candidates, map);
+        Assert.assertEquals(candidates.size(), 1);
+        Assert.assertTrue(candidates.contains("broker1"));
+
+        candidates = Sets.newHashSet("broker1", "broker2", "broker3");
+        fillBrokerToNamespaceToBundleMap(map, "broker3", namespace, "0xd0000000_0xffffffff");
+        LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, candidates, map);
+        Assert.assertEquals(candidates.size(), 2);
+        Assert.assertTrue(candidates.contains("broker1"));
+        Assert.assertTrue(candidates.contains("broker3"));
+    }
+
+    private static void fillBrokerToNamespaceToBundleMap(
+            ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> map,
+            String broker, String namespace, String bundle) {
+        map.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
+                .computeIfAbsent(namespace, k -> new ConcurrentOpenHashSet<>()).add(bundle);
+    }
+
+}