You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/09/15 12:31:13 UTC
[pulsar] branch master updated: Fix bug that fails to search
namespace bundle due to NPE (#5191)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 412c8fd Fix bug that fails to search namespace bundle due to NPE (#5191)
412c8fd is described below
commit 412c8fdb5ea61593017c395a7c8e291c922d6bf5
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Sun Sep 15 21:31:05 2019 +0900
Fix bug that fails to search namespace bundle due to NPE (#5191)
Fixes #5176
### Motivation
As mentioned in #5176, NPE may occur in the load manager and fail to search for namespace bundles. This is because multiple threads may update a map named `brokerToNamespaceToBundleRange` at the same time.
### Modifications
- Changed `brokerToNamespaceToBundleRange` to ConcurrentOpenHashMap instead of HashMap which is not thread safe.
- Fixed `LoadManagerShared.removeMostServicingBrokersForNamespace()` logic so that NPE does not occur.
---
.../broker/loadbalance/impl/LoadManagerShared.java | 66 ++++++++---------
.../loadbalance/impl/ModularLoadManagerImpl.java | 18 +++--
.../loadbalance/impl/SimpleLoadManagerImpl.java | 15 ++--
.../AntiAffinityNamespaceGroupTest.java | 17 +++--
.../loadbalance/impl/LoadManagerSharedTest.java | 86 ++++++++++++++++++++++
5 files changed, 151 insertions(+), 51 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 6a5575c..d9cab23 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.loadbalance.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.web.PulsarWebResource.path;
+import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
import java.io.IOException;
import java.net.MalformedURLException;
@@ -41,12 +42,13 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
import org.apache.pulsar.broker.loadbalance.LoadData;
-import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.slf4j.Logger;
@@ -195,11 +197,12 @@ public class LoadManagerShared {
* @param target
* Map to fill.
*/
- public static void fillNamespaceToBundlesMap(final Set<String> bundles, final Map<String, Set<String>> target) {
+ public static void fillNamespaceToBundlesMap(final Set<String> bundles,
+ final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> target) {
bundles.forEach(bundleName -> {
final String namespaceName = getNamespaceNameFromBundleName(bundleName);
final String bundleRange = getBundleRangeFromBundleName(bundleName);
- target.computeIfAbsent(namespaceName, k -> new HashSet<>()).add(bundleRange);
+ target.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).add(bundleRange);
});
}
@@ -258,41 +261,32 @@ public class LoadManagerShared {
* Map from brokers to namespaces to bundle ranges.
*/
public static void removeMostServicingBrokersForNamespace(final String assignedBundleName,
- final Set<String> candidates, final Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange) {
+ final Set<String> candidates,
+ final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange) {
if (candidates.isEmpty()) {
return;
}
+
final String namespaceName = getNamespaceNameFromBundleName(assignedBundleName);
int leastBundles = Integer.MAX_VALUE;
+
for (final String broker : candidates) {
- if (brokerToNamespaceToBundleRange.containsKey(broker)) {
- final Set<String> bundleRanges = brokerToNamespaceToBundleRange.get(broker).get(namespaceName);
- if (bundleRanges == null) {
- // Assume that when the namespace is absent, there are no bundles for this namespace assigned to
- // that broker.
- leastBundles = 0;
- break;
- }
- leastBundles = Math.min(leastBundles, bundleRanges.size());
- } else {
- // Assume non-present brokers have 0 bundles.
- leastBundles = 0;
+ int bundles = (int) brokerToNamespaceToBundleRange
+ .computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
+ .computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).size();
+ leastBundles = Math.min(leastBundles, bundles);
+ if (leastBundles == 0) {
break;
}
}
- if (leastBundles == 0) {
- // By assumption, the namespace name will not be present if there are no bundles in the namespace
- // assigned to the broker.
- candidates.removeIf(broker -> brokerToNamespaceToBundleRange.containsKey(broker)
- && brokerToNamespaceToBundleRange.get(broker).containsKey(namespaceName));
- } else {
- final int finalLeastBundles = leastBundles;
- // We may safely assume that each broker has at least one bundle for this namespace.
- // Note that this case is far less likely since it implies that there are at least as many bundles for this
- // namespace as brokers.
- candidates.removeIf(broker -> brokerToNamespaceToBundleRange.get(broker).get(namespaceName)
- .size() != finalLeastBundles);
- }
+
+ // Since `brokerToNamespaceToBundleRange` can be updated by other threads,
+ // `leastBundles` may differ from the actual value.
+
+ final int finalLeastBundles = leastBundles;
+ candidates.removeIf(
+ broker -> brokerToNamespaceToBundleRange.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
+ .computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).size() > finalLeastBundles);
}
/**
@@ -324,7 +318,8 @@ public class LoadManagerShared {
* @param brokerToNamespaceToBundleRange
*/
public static void filterAntiAffinityGroupOwnedBrokers(final PulsarService pulsar, final String assignedBundleName,
- final Set<String> candidates, final Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange,
+ final Set<String> candidates,
+ final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange,
Map<String, String> brokerToDomainMap) {
if (candidates.isEmpty()) {
return;
@@ -424,8 +419,8 @@ public class LoadManagerShared {
* @return
*/
public static CompletableFuture<Map<String, Integer>> getAntiAffinityNamespaceOwnedBrokers(
- final PulsarService pulsar, String namespaceName,
- Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange) {
+ final PulsarService pulsar, final String namespaceName,
+ final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange) {
CompletableFuture<Map<String, Integer>> antiAffinityNsBrokersResult = new CompletableFuture<>();
ZooKeeperDataCache<Policies> policiesCache = pulsar.getConfigurationCache().policiesCache();
@@ -440,6 +435,10 @@ public class LoadManagerShared {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
brokerToNamespaceToBundleRange.forEach((broker, nsToBundleRange) -> {
nsToBundleRange.forEach((ns, bundleRange) -> {
+ if (bundleRange.isEmpty()) {
+ return;
+ }
+
CompletableFuture<Void> future = new CompletableFuture<>();
futures.add(future);
policiesCache.getAsync(path(POLICIES, ns)).thenAccept(nsPolicies -> {
@@ -481,7 +480,8 @@ public class LoadManagerShared {
* @throws Exception
*/
public static boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle, String currentBroker,
- final PulsarService pulsar, Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange,
+ final PulsarService pulsar,
+ final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange,
Set<String> candidateBroekrs) throws Exception {
Map<String, Integer> brokerNamespaceCount = getAntiAffinityNamespaceOwnedBrokers(pulsar, namespace,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 7f05c99..0b17479 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -69,6 +69,8 @@ import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
@@ -125,7 +127,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// Map from brokers to namespaces to the bundle ranges in that namespace assigned to that broker.
// Used to distribute bundles within a namespace evely across brokers.
- private final Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange;
+ private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange;
// Path to the ZNode containing the LocalBrokerData json for this broker.
private String brokerZnodePath;
@@ -189,7 +191,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
*/
public ModularLoadManagerImpl() {
brokerCandidateCache = new HashSet<>();
- brokerToNamespaceToBundleRange = new HashMap<>();
+ brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
defaultStats = new NamespaceBundleStats();
filterPipeline = new ArrayList<>();
loadData = new LoadData();
@@ -544,8 +546,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// Using the newest data, update the aggregated time-average data for the current broker.
brokerData.getTimeAverageData().reset(statsMap.keySet(), bundleData, defaultStats);
- final Map<String, Set<String>> namespaceToBundleRange = brokerToNamespaceToBundleRange
- .computeIfAbsent(broker, k -> new HashMap<>());
+ final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange = brokerToNamespaceToBundleRange
+ .computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.clear();
LoadManagerShared.fillNamespaceToBundlesMap(statsMap.keySet(), namespaceToBundleRange);
@@ -769,8 +771,12 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
- brokerToNamespaceToBundleRange.get(broker.get()).computeIfAbsent(namespaceName, k -> new HashSet<>())
- .add(bundleRange);
+ final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange = brokerToNamespaceToBundleRange
+ .computeIfAbsent(broker.get(), k -> new ConcurrentOpenHashMap<>());
+ synchronized (namespaceToBundleRange) {
+ namespaceToBundleRange.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>())
+ .add(bundleRange);
+ }
return broker;
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 9bb64eb..3610209 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -54,6 +54,8 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUnitRanking;
@@ -113,7 +115,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
// Map from brokers to namespaces to the bundle ranges in that namespace assigned to that broker.
// Used to distribute bundles within a namespace evely across brokers.
- private final Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange;
+ private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange;
// CPU usage per msg/sec
private double realtimeCpuLoadFactor = 0.025;
@@ -199,7 +201,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
bundleLossesCache = new HashSet<>();
brokerCandidateCache = new HashSet<>();
availableBrokersCache = new HashSet<>();
- brokerToNamespaceToBundleRange = new HashMap<>();
+ brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
public boolean isEnablePersistentTopics(String brokerUrl) {
@@ -899,8 +901,9 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
// Add preallocated bundle range so incoming bundles from the same namespace are not assigned to the
// same broker.
brokerToNamespaceToBundleRange
- .computeIfAbsent(selectedRU.getResourceId().replace("http://", ""), k -> new HashMap<>())
- .computeIfAbsent(namespaceName, k -> new HashSet<>()).add(bundleRange);
+ .computeIfAbsent(selectedRU.getResourceId().replace("http://", ""),
+ k -> new ConcurrentOpenHashMap<>())
+ .computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).add(bundleRange);
ranking.addPreAllocatedServiceUnit(serviceUnitId, quota);
resourceUnitRankings.put(selectedRU, ranking);
}
@@ -1322,8 +1325,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
final String broker = resourceUnit.getResourceId();
final Set<String> loadedBundles = ranking.getLoadedBundles();
final Set<String> preallocatedBundles = resourceUnitRankings.get(resourceUnit).getPreAllocatedBundles();
- final Map<String, Set<String>> namespaceToBundleRange = brokerToNamespaceToBundleRange
- .computeIfAbsent(broker.replace("http://", ""), k -> new HashMap<>());
+ final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange = brokerToNamespaceToBundleRange
+ .computeIfAbsent(broker.replace("http://", ""), k -> new ConcurrentOpenHashMap<>());
namespaceToBundleRange.clear();
LoadManagerShared.fillNamespaceToBundlesMap(loadedBundles, namespaceToBundleRange);
LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundles, namespaceToBundleRange);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index 2c33538..4059b18 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -52,6 +52,8 @@ import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
@@ -236,7 +238,7 @@ public class AntiAffinityNamespaceGroupTest {
brokerToDomainMap.put("brokerName-3", "domain-1");
Set<String> candidate = Sets.newHashSet();
- Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange = Maps.newHashMap();
+ ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
assertEquals(brokers.size(), totalBrokers);
@@ -322,7 +324,7 @@ public class AntiAffinityNamespaceGroupTest {
Set<String> brokers = Sets.newHashSet();
Set<String> candidate = Sets.newHashSet();
- Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange = Maps.newHashMap();
+ ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
brokers.add("broker-0");
brokers.add("broker-1");
brokers.add("broker-2");
@@ -366,10 +368,13 @@ public class AntiAffinityNamespaceGroupTest {
assertEquals(candidate.size(), 3);
}
- private void selectBrokerForNamespace(Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange,
+ private void selectBrokerForNamespace(
+ ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange,
String broker, String namespace, String assignedBundleName) {
- Map<String, Set<String>> nsToBundleMap = Maps.newHashMap();
- nsToBundleMap.put(namespace, Sets.newHashSet(assignedBundleName));
+ ConcurrentOpenHashSet<String> bundleSet = new ConcurrentOpenHashSet<>();
+ bundleSet.add(assignedBundleName);
+ ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> nsToBundleMap = new ConcurrentOpenHashMap<>();
+ nsToBundleMap.put(namespace, bundleSet);
brokerToNamespaceToBundleRange.put(broker, nsToBundleMap);
}
@@ -458,7 +463,7 @@ public class AntiAffinityNamespaceGroupTest {
Set<String> brokers = Sets.newHashSet();
Set<String> candidate = Sets.newHashSet();
- Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange = Maps.newHashMap();
+ ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
brokers.add("broker-0");
brokers.add("broker-1");
brokers.add("broker-2");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
new file mode 100644
index 0000000..b0df1bf
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class LoadManagerSharedTest {
+
+ @Test
+ public void testRemoveMostServicingBrokersForNamespace() throws Exception {
+ String namespace = "tenant1/ns1";
+ String assignedBundle = namespace + "/0x00000000_0x40000000";
+
+ Set<String> candidates = Sets.newHashSet();
+ ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> map = new ConcurrentOpenHashMap<>();
+ LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, candidates, map);
+ Assert.assertEquals(candidates.size(), 0);
+
+ candidates = Sets.newHashSet("broker1");
+ LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, candidates, map);
+ Assert.assertEquals(candidates.size(), 1);
+ Assert.assertTrue(candidates.contains("broker1"));
+
+ candidates = Sets.newHashSet("broker1");
+ fillBrokerToNamespaceToBundleMap(map, "broker1", namespace, "0x40000000_0x80000000");
+ LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, candidates, map);
+ Assert.assertEquals(candidates.size(), 1);
+ Assert.assertTrue(candidates.contains("broker1"));
+
+ candidates = Sets.newHashSet("broker1", "broker2");
+ LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, candidates, map);
+ Assert.assertEquals(candidates.size(), 1);
+ Assert.assertTrue(candidates.contains("broker2"));
+
+ candidates = Sets.newHashSet("broker1", "broker2");
+ fillBrokerToNamespaceToBundleMap(map, "broker2", namespace, "0x80000000_0xc0000000");
+ LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, candidates, map);
+ Assert.assertEquals(candidates.size(), 2);
+ Assert.assertTrue(candidates.contains("broker1"));
+ Assert.assertTrue(candidates.contains("broker2"));
+
+ candidates = Sets.newHashSet("broker1", "broker2");
+ fillBrokerToNamespaceToBundleMap(map, "broker2", namespace, "0xc0000000_0xd0000000");
+ LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, candidates, map);
+ Assert.assertEquals(candidates.size(), 1);
+ Assert.assertTrue(candidates.contains("broker1"));
+
+ candidates = Sets.newHashSet("broker1", "broker2", "broker3");
+ fillBrokerToNamespaceToBundleMap(map, "broker3", namespace, "0xd0000000_0xffffffff");
+ LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, candidates, map);
+ Assert.assertEquals(candidates.size(), 2);
+ Assert.assertTrue(candidates.contains("broker1"));
+ Assert.assertTrue(candidates.contains("broker3"));
+ }
+
+ private static void fillBrokerToNamespaceToBundleMap(
+ ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> map,
+ String broker, String namespace, String bundle) {
+ map.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
+ .computeIfAbsent(namespace, k -> new ConcurrentOpenHashSet<>()).add(bundle);
+ }
+
+}