You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/01/30 19:06:40 UTC
[incubator-pulsar] branch master updated: Use Optional for
returning least loaded broker (#1122)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d6d4fa3 Use Optional<String> for returning least loaded broker (#1122)
d6d4fa3 is described below
commit d6d4fa3a1103f650bf16e74429f82db3d33fe739
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jan 30 11:06:38 2018 -0800
Use Optional<String> for returning least loaded broker (#1122)
* Use Optional<String> for returning least loaded broker
* Addressed comments
* Fixed unit tests
* More test fixes
---
.../pulsar/broker/loadbalance/LoadManager.java | 3 +-
.../broker/loadbalance/ModularLoadManager.java | 14 ++++---
.../loadbalance/ModularLoadManagerStrategy.java | 7 ++--
.../loadbalance/impl/LeastLongTermMessageRate.java | 15 ++++++--
.../loadbalance/impl/ModularLoadManagerImpl.java | 44 ++++++++++++----------
.../impl/ModularLoadManagerWrapper.java | 14 +++++--
.../loadbalance/impl/SimpleLoadManagerImpl.java | 17 +++++----
.../pulsar/broker/namespace/NamespaceService.java | 22 +++++------
.../AntiAffinityNamespaceGroupTest.java | 38 +++++++++----------
.../broker/loadbalance/LoadBalancerTest.java | 12 +++---
.../loadbalance/ModularLoadManagerImplTest.java | 32 ++++++++--------
.../ModularLoadManagerStrategyTest.java | 11 +++---
.../loadbalance/SimpleLoadManagerImplTest.java | 20 ++++------
.../pulsar/client/api/BrokerServiceLookupTest.java | 13 ++++---
.../pulsar/client/api/NonPersistentTopicTest.java | 4 +-
15 files changed, 143 insertions(+), 123 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
index f9afae4..3a7409e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.loadbalance;
import java.util.List;
+import java.util.Optional;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -55,7 +56,7 @@ public interface LoadManager {
/**
* Returns the Least Loaded Resource Unit decided by some algorithm or criteria which is implementation specific
*/
- ResourceUnit getLeastLoaded(ServiceUnitId su) throws Exception;
+ Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception;
/**
* Generate the load report
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
index e07bc5a..9d5603d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.loadbalance;
+import java.util.Optional;
+
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -33,7 +35,7 @@ public interface ModularLoadManager {
/**
* As any broker, disable the broker this manager is running on.
- *
+ *
* @throws PulsarServerException
* If ZooKeeper failed to disable the broker.
*/
@@ -57,16 +59,16 @@ public interface ModularLoadManager {
/**
* As the leader broker, find a suitable broker for the assignment of the given bundle.
- *
+ *
* @param serviceUnit
* ServiceUnitId for the bundle.
* @return The name of the selected broker, as it appears on ZooKeeper.
*/
- String selectBrokerForAssignment(ServiceUnitId serviceUnit);
+ Optional<String> selectBrokerForAssignment(ServiceUnitId serviceUnit);
/**
* As any broker, start the load manager.
- *
+ *
* @throws PulsarServerException
* If an unexpected error prevented the load manager from being started.
*/
@@ -74,7 +76,7 @@ public interface ModularLoadManager {
/**
* As any broker, stop the load manager.
- *
+ *
* @throws PulsarServerException
* If an unexpected error occurred when attempting to stop the load manager.
*/
@@ -97,7 +99,7 @@ public interface ModularLoadManager {
/**
* Return :{@link Deserializer} to deserialize load-manager load report
- *
+ *
* @return
*/
Deserializer<? extends ServiceLookupData> getLoadReportDeserializer();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategy.java
index 098291b..7a43dc3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategy.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategy.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance;
+import java.util.Optional;
import java.util.Set;
import org.apache.pulsar.broker.BundleData;
@@ -32,7 +33,7 @@ public interface ModularLoadManagerStrategy {
/**
* Find a suitable broker to assign the given bundle to.
- *
+ *
* @param candidates
* The candidates for which the bundle may be assigned.
* @param bundleToAssign
@@ -43,12 +44,12 @@ public interface ModularLoadManagerStrategy {
* The service configuration.
* @return The name of the selected broker as it appears on ZooKeeper.
*/
- String selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+ Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
ServiceConfiguration conf);
/**
* Create a placement strategy using the configuration.
- *
+ *
* @param conf
* ServiceConfiguration to use.
* @return A placement strategy from the given configurations.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastLongTermMessageRate.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastLongTermMessageRate.java
index 473f0ca..d14a72c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastLongTermMessageRate.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastLongTermMessageRate.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.loadbalance.impl;
import java.util.ArrayList;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
@@ -57,7 +58,7 @@ public class LeastLongTermMessageRate implements ModularLoadManagerStrategy {
log.warn("Broker {} is overloaded: max usage={}", brokerData.getLocalData().getWebServiceUrl(), maxUsage);
return Double.POSITIVE_INFINITY;
}
-
+
double totalMessageRate = 0;
for (BundleData bundleData : brokerData.getPreallocatedBundleData().values()) {
final TimeAverageMessageData longTermData = bundleData.getLongTermData();
@@ -79,7 +80,7 @@ public class LeastLongTermMessageRate implements ModularLoadManagerStrategy {
/**
* Find a suitable broker to assign the given bundle to.
- *
+ *
* @param candidates
* The candidates for which the bundle may be assigned.
* @param bundleToAssign
@@ -91,7 +92,7 @@ public class LeastLongTermMessageRate implements ModularLoadManagerStrategy {
* @return The name of the selected broker as it appears on ZooKeeper.
*/
@Override
- public String selectBroker(final Set<String> candidates, final BundleData bundleToAssign, final LoadData loadData,
+ public Optional<String> selectBroker(final Set<String> candidates, final BundleData bundleToAssign, final LoadData loadData,
final ServiceConfiguration conf) {
bestBrokers.clear();
double minScore = Double.POSITIVE_INFINITY;
@@ -125,6 +126,12 @@ public class LeastLongTermMessageRate implements ModularLoadManagerStrategy {
// Assign randomly in this case.
bestBrokers.addAll(candidates);
}
- return bestBrokers.get(ThreadLocalRandom.current().nextInt(bestBrokers.size()));
+
+ if (bestBrokers.isEmpty()) {
+ // If still, it means there are no available brokers at this point
+ return Optional.empty();
+ }
+
+ return Optional.of(bestBrokers.get(ThreadLocalRandom.current().nextInt(bestBrokers.size())));
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index b8519d8..4b72a10 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -35,7 +35,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
@@ -131,7 +130,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// Strategy to use for splitting bundles.
private BundleSplitStrategy bundleSplitStrategy;
-
+
// Service configuration belonging to the pulsar service.
private ServiceConfiguration conf;
@@ -174,10 +173,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// ZooKeeper belonging to the pulsar service.
private ZooKeeper zkClient;
-
+
// check if given broker can load persistent/non-persistent topic
private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;
-
+
private Map<String, String> brokerToFailureDomainMap;
private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> jsonMapper()
@@ -197,7 +196,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
preallocatedBundleToBroker = new ConcurrentHashMap<>();
scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-modular-load-manager"));
this.brokerToFailureDomainMap = Maps.newHashMap();
-
+
this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
public boolean isEnablePersistentTopics(String brokerUrl) {
@@ -253,7 +252,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
}
bundleSplitStrategy = new BundleSplitterTask(pulsar);
-
+
conf = pulsar.getConfiguration();
// Initialize the default stats to assume for unseen bundles (hard-coded for now).
@@ -273,12 +272,12 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
localData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
localData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
-
+
placementStrategy = ModularLoadManagerStrategy.create(conf);
policies = new SimpleResourceAllocationPolicies(pulsar);
zkClient = pulsar.getZkClient();
filterPipeline.add(new BrokerVersionFilter());
-
+
refreshBrokerToFailureDomainMap();
// register listeners for domain changes
pulsar.getConfigurationCache().failureDomainListCache()
@@ -591,7 +590,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
for (Map.Entry<String, String> entry : bundlesToUnload.entrySet()) {
final String broker = entry.getKey();
final String bundle = entry.getValue();
-
+
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
if(!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
@@ -640,7 +639,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
*/
@Override
public void checkNamespaceBundleSplit() {
-
+
if (!conf.isLoadBalancerAutoBundleSplitEnabled() || pulsar.getLeaderElectionService() == null
|| !pulsar.getLeaderElectionService().isLeader()) {
return;
@@ -673,7 +672,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
}
}
}
-
+
}
/**
@@ -692,13 +691,13 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
* @return The name of the selected broker, as it appears on ZooKeeper.
*/
@Override
- public String selectBrokerForAssignment(final ServiceUnitId serviceUnit) {
+ public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUnit) {
// Use brokerCandidateCache as a lock to reduce synchronization.
synchronized (brokerCandidateCache) {
final String bundle = serviceUnit.toString();
if (preallocatedBundleToBroker.containsKey(bundle)) {
// If the given bundle is already in preallocated, return the selected broker.
- return preallocatedBundleToBroker.get(bundle);
+ return Optional.of(preallocatedBundleToBroker.get(bundle));
}
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
key -> getBundleDataOrDefault(bundle));
@@ -737,13 +736,18 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
}
// Choose a broker among the potentially smaller filtered list, when possible
- String broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
+ Optional<String> broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
if (log.isDebugEnabled()) {
log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache);
}
+ if (!broker.isPresent()) {
+ // No brokers available
+ return broker;
+ }
+
final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
- final double maxUsage = loadData.getBrokerData().get(broker).getLocalData().getMaxResourceUsage();
+ final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
if (maxUsage > overloadThreshold) {
// All brokers that were in the filtered list were overloaded, so check if there is a better broker
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
@@ -752,12 +756,12 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
}
// Add new bundle to preallocated.
- loadData.getBrokerData().get(broker).getPreallocatedBundleData().put(bundle, data);
- preallocatedBundleToBroker.put(bundle, broker);
+ loadData.getBrokerData().get(broker.get()).getPreallocatedBundleData().put(bundle, data);
+ preallocatedBundleToBroker.put(bundle, broker.get());
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
- brokerToNamespaceToBundleRange.get(broker).computeIfAbsent(namespaceName, k -> new HashSet<>())
+ brokerToNamespaceToBundleRange.get(broker.get()).computeIfAbsent(namespaceName, k -> new HashSet<>())
.add(bundleRange);
return broker;
}
@@ -827,7 +831,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
/**
* As any broker, retrieve the namespace bundle stats and system resource usage to update data local to this broker.
- * @return
+ * @return
*/
@Override
public LocalBrokerData updateLocalBrokerData() {
@@ -902,7 +906,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
}
}
}
-
+
private void deleteBundleDataFromZookeeper(String bundle) {
final String zooKeeperPath = getBundleDataZooKeeperPath(bundle);
try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index ed91991..54b9d56 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.loadbalance.impl;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -63,9 +64,14 @@ public class ModularLoadManagerWrapper implements LoadManager {
}
@Override
- public ResourceUnit getLeastLoaded(final ServiceUnitId serviceUnit) {
- return new SimpleResourceUnit(String.format("http://%s", loadManager.selectBrokerForAssignment(serviceUnit)),
- new PulsarResourceDescription());
+ public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId serviceUnit) {
+ Optional<String> leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit);
+ if (leastLoadedBroker.isPresent()) {
+ return Optional.of(new SimpleResourceUnit(String.format("http://%s", leastLoadedBroker.get()),
+ new PulsarResourceDescription()));
+ } else {
+ return Optional.empty();
+ }
}
@Override
@@ -112,7 +118,7 @@ public class ModularLoadManagerWrapper implements LoadManager {
public Deserializer<? extends ServiceLookupData> getLoadReportDeserializer() {
return loadManager.getLoadReportDeserializer();
}
-
+
public ModularLoadManager getLoadManager() {
return loadManager;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index bdf0345..bbaf8bb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.loadbalance.impl;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.pulsar.broker.admin.AdminResource.jsonMapper;
+import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL;
import java.io.IOException;
import java.time.LocalDateTime;
@@ -30,6 +31,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Executors;
@@ -47,7 +49,6 @@ import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.PlacementStrategy;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
-import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ResourceQuota;
@@ -228,7 +229,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
lastLoadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
lastLoadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
-
+
loadReportCacheZk = new ZooKeeperDataCache<LoadReport>(pulsar.getLocalZkCache()) {
@Override
public LoadReport deserialize(String key, byte[] content) throws Exception {
@@ -940,8 +941,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
}
}
- public ResourceUnit getLeastLoaded(ServiceUnitId serviceUnit) throws Exception {
- return getLeastLoadedBroker(serviceUnit, getAvailableBrokers(serviceUnit));
+ public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId serviceUnit) throws Exception {
+ return Optional.ofNullable(getLeastLoadedBroker(serviceUnit, getAvailableBrokers(serviceUnit)));
}
public Multimap<Long, ResourceUnit> getResourceAvailabilityFor(ServiceUnitId serviceUnitId) throws Exception {
@@ -1098,8 +1099,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
}
return generateLoadReportForcefully();
}
-
- private LoadReport generateLoadReportForcefully() throws Exception {
+
+ private LoadReport generateLoadReportForcefully() throws Exception {
synchronized (bundleGainsCache) {
try {
LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
@@ -1272,7 +1273,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
/**
* Check if last generated load-report time passed the minimum time for load-report update.
- *
+ *
* @return true: if last load-report generation passed the minimum interval and load-report can be generated false:
* if load-report generation has not passed minimum interval to update load-report again
*/
@@ -1476,7 +1477,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
availableActiveBrokers.close();
scheduler.shutdown();
}
-
+
private long getBrokerZnodeOwner() {
try {
Stat stat = new Stat();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 6602766..e7b885a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -460,20 +460,20 @@ public class NamespaceService {
* @throws Exception
*/
private Optional<String> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
- ResourceUnit leastLoadedBroker = loadManager.get().getLeastLoaded(serviceUnit);
- if (leastLoadedBroker != null) {
- String lookupAddress = leastLoadedBroker.getResourceId();
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} : redirecting to the least loaded broker, lookup address={}",
- pulsar.getWebServiceAddress(), lookupAddress);
- }
- return Optional.of(lookupAddress);
- } else {
+ Optional<ResourceUnit> leastLoadedBroker = loadManager.get().getLeastLoaded(serviceUnit);
+ if (!leastLoadedBroker.isPresent()) {
LOG.warn("No broker is available for {}", serviceUnit);
return Optional.empty();
}
+
+ String lookupAddress = leastLoadedBroker.get().getResourceId();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} : redirecting to the least loaded broker, lookup address={}", pulsar.getWebServiceAddress(),
+ lookupAddress);
+ }
+ return Optional.of(lookupAddress);
}
-
+
public void unloadNamespaceBundle(NamespaceBundle bundle) throws Exception {
unloadNamespaceBundle(bundle, 5, TimeUnit.MINUTES);
}
@@ -607,7 +607,7 @@ public class NamespaceService {
String msg = format("bundle %s not found under namespace", bundle.toString());
unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
}
-
+
return unloadFuture.thenApply(res -> {
if (!unload) {
return null;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index ec0bf1f..f8b584a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -185,27 +185,27 @@ public class AntiAffinityNamespaceGroupTest {
}
/**
- *
+ *
* It verifies anti-affinity-namespace assignment with failure-domain
- *
+ *
* <pre>
* Domain Brokers-count
* ________ ____________
* domain-0 broker-0,broker-1
* domain-1 broker-2,broker-3
- *
+ *
* Anti-affinity-namespace assignment
- *
+ *
* (1) ns0 -> candidate-brokers: b0, b1, b2, b3 => selected b0
* (2) ns1 -> candidate-brokers: b2, b3 => selected b2
* (3) ns2 -> candidate-brokers: b1, b3 => selected b1
* (4) ns3 -> candidate-brokers: b3 => selected b3
* (5) ns4 -> candidate-brokers: b0, b1, b2, b3 => selected b0
- *
+ *
* "candidate" broker to own anti-affinity-namespace = b2 or b4
- *
+ *
* </pre>
- *
+ *
*/
@Test
public void testAntiAffinityNamespaceFilteringWithDomain() throws Exception {
@@ -292,17 +292,17 @@ public class AntiAffinityNamespaceGroupTest {
/**
* It verifies anti-affinity-namespace assignment without failure-domain enabled
- *
+ *
* <pre>
* Anti-affinity-namespace assignment
- *
+ *
* (1) ns0 -> candidate-brokers: b0, b1, b2 => selected b0
* (2) ns1 -> candidate-brokers: b1, b2 => selected b1
* (3) ns2 -> candidate-brokers: b2 => selected b2
* (5) ns3 -> candidate-brokers: b0, b1, b2 => selected b0
* </pre>
- *
- *
+ *
+ *
* @throws Exception
*/
@Test
@@ -377,15 +377,15 @@ public class AntiAffinityNamespaceGroupTest {
/**
* It verifies anti-affinity with failure domain enabled with 2 brokers.
- *
+ *
* <pre>
* 1. Register brokers to domain: domain-1: broker1 & domain-2: broker2
* 2. Load-Manager receives a watch and updates brokerToDomain cache with new domain data
* 3. Create two namespace with anti-affinity
* 4. Load-manager selects broker for each namespace such that from different domains
- *
+ *
* </pre>
- *
+ *
* @throws Exception
*/
@Test
@@ -420,10 +420,10 @@ public class AntiAffinityNamespaceGroupTest {
assertTrue(isLoadManagerUpdatedDomainCache(secondaryLoadManager));
ServiceUnitId serviceUnit = makeBundle(property, cluster, "ns1");
- String selectedBroker1 = primaryLoadManager.selectBrokerForAssignment(serviceUnit);
+ String selectedBroker1 = primaryLoadManager.selectBrokerForAssignment(serviceUnit).get();
serviceUnit = makeBundle(property, cluster, "ns2");
- String selectedBroker2 = primaryLoadManager.selectBrokerForAssignment(serviceUnit);
+ String selectedBroker2 = primaryLoadManager.selectBrokerForAssignment(serviceUnit).get();
assertNotEquals(selectedBroker1, selectedBroker2);
@@ -432,13 +432,13 @@ public class AntiAffinityNamespaceGroupTest {
/**
* It verifies that load-shedding task should unload namespace only if there is a broker available which doesn't
* cause uneven anti-affinitiy namespace distribution.
- *
+ *
* <pre>
* 1. broker1 owns ns-0 => broker1 can unload ns-0
* 1. broker2 owns ns-1 => broker1 can unload ns-0
* 1. broker3 owns ns-2 => broker1 can't unload ns-0 as all brokers have same no NS
* </pre>
- *
+ *
* @throws Exception
*/
@Test
@@ -491,7 +491,7 @@ public class AntiAffinityNamespaceGroupTest {
/**
* It verifies that load-manager::shouldAntiAffinityNamespaceUnload checks that unloading should only happen if all
* brokers have same number of anti-affinity namespaces
- *
+ *
* @throws Exception
*/
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index a877013..1cf6822 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -225,7 +225,7 @@ public class LoadBalancerTest {
assertEquals(brokerCount, BROKER_COUNT);
DestinationName fqdn = DestinationName.get("persistent://pulsar/use/primary-ns/test-topic");
ResourceUnit found = pulsarServices[i].getLoadManager().get()
- .getLeastLoaded(pulsarServices[i].getNamespaceService().getBundle(fqdn));
+ .getLeastLoaded(pulsarServices[i].getNamespaceService().getBundle(fqdn)).get();
assertTrue(found != null);
}
} catch (InterruptedException | KeeperException e) {
@@ -264,7 +264,7 @@ public class LoadBalancerTest {
for (int i = 0; i < totalNamespaces; i++) {
DestinationName fqdn = DestinationName.get("persistent://pulsar/use/primary-ns-" + i + "/test-topic");
ResourceUnit found = pulsarServices[0].getLoadManager().get()
- .getLeastLoaded(pulsarServices[0].getNamespaceService().getBundle(fqdn));
+ .getLeastLoaded(pulsarServices[0].getNamespaceService().getBundle(fqdn)).get();
if (namespaceOwner.containsKey(found.getResourceId())) {
namespaceOwner.put(found.getResourceId(), namespaceOwner.get(found.getResourceId()) + 1);
} else {
@@ -404,7 +404,7 @@ public class LoadBalancerTest {
for (int i = 0; i < totalNamespaces; i++) {
DestinationName fqdn = DestinationName.get("persistent://pulsar/use/primary-ns-" + i + "/test-topic");
ResourceUnit found = pulsarServices[0].getLoadManager().get()
- .getLeastLoaded(pulsarServices[0].getNamespaceService().getBundle(fqdn));
+ .getLeastLoaded(pulsarServices[0].getNamespaceService().getBundle(fqdn)).get();
if (namespaceOwner.containsKey(found.getResourceId())) {
namespaceOwner.put(found.getResourceId(), namespaceOwner.get(found.getResourceId()) + 1);
} else {
@@ -831,7 +831,7 @@ public class LoadBalancerTest {
sortedRankings.set(loadManager, sortedRankingsInstance);
ResourceUnit found = ((SimpleLoadManagerImpl) loadManager)
- .getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10"));
+ .getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10")).get();
assertEquals("http://prod1-broker1.messaging.use.example.com:8080", found.getResourceId());
zkCacheField.set(pulsarServices[0], originalLZK1);
@@ -951,7 +951,7 @@ public class LoadBalancerTest {
Map<String, Integer> namespaceOwner = new HashMap<String, Integer>();
for (int i = 0; i < totalNamespaces; i++) {
ResourceUnit found = loadManager
- .getLeastLoaded(DestinationName.get("persistent://pulsar/use/primary-ns/topic" + i));
+ .getLeastLoaded(DestinationName.get("persistent://pulsar/use/primary-ns/topic" + i)).get();
if (namespaceOwner.containsKey(found.getResourceId())) {
namespaceOwner.put(found.getResourceId(), namespaceOwner.get(found.getResourceId()) + 1);
} else {
@@ -1012,7 +1012,7 @@ public class LoadBalancerTest {
Map<String, Integer> namespaceOwner = new HashMap<String, Integer>();
for (int i = 0; i < totalNamespaces; i++) {
ResourceUnit found = loadManager
- .getLeastLoaded(DestinationName.get("persistent://pulsar/use/primary-ns/topic-" + i));
+ .getLeastLoaded(DestinationName.get("persistent://pulsar/use/primary-ns/topic-" + i)).get();
if (namespaceOwner.containsKey(found.getResourceId())) {
namespaceOwner.put(found.getResourceId(), namespaceOwner.get(found.getResourceId()) + 1);
} else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index fce825c..7d36d39 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -225,7 +225,7 @@ public class ModularLoadManagerImplTest {
// After 2 selections, the load balancer should select both brokers due to preallocation.
for (int i = 0; i < 2; ++i) {
final ServiceUnitId serviceUnit = makeBundle(Integer.toString(i));
- final String broker = primaryLoadManager.selectBrokerForAssignment(serviceUnit);
+ final String broker = primaryLoadManager.selectBrokerForAssignment(serviceUnit).get();
if (broker.equals(primaryHost)) {
foundFirst = true;
} else {
@@ -289,13 +289,13 @@ public class ModularLoadManagerImplTest {
/**
* It verifies that once broker owns max-number of topics: load-manager doesn't allocates new bundles to that broker
* unless all the brokers are in same state.
- *
+ *
* <pre>
* 1. Create a bundle whose bundle-resource-quota will contain max-topics
* 2. Load-manager assigns broker to this bundle so, assigned broker is overloaded with max-topics
* 3. For any new further bundles: broker assigns different brokers.
* </pre>
- *
+ *
* @throws Exception
*/
@Test
@@ -313,13 +313,13 @@ public class ModularLoadManagerImplTest {
final String firstBundleDataPath = String.format("%s/%s", ModularLoadManagerImpl.BUNDLE_DATA_ZPATH, bundles[0]);
ZkUtils.createFullPathOptimistic(pulsar1.getZkClient(), firstBundleDataPath, bundleData.getJsonBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- String maxTopicOwnedBroker = primaryLoadManager.selectBrokerForAssignment(bundles[0]);
+ String maxTopicOwnedBroker = primaryLoadManager.selectBrokerForAssignment(bundles[0]).get();
for (int i = 1; i < totalBundles; i++) {
assertNotEquals(primaryLoadManager.selectBrokerForAssignment(bundles[i]), maxTopicOwnedBroker);
}
}
-
+
// Test that load shedding works
@Test
public void testLoadShedding() throws Exception {
@@ -429,7 +429,7 @@ public class ModularLoadManagerImplTest {
currentData.getCpu().usage = 106;
currentData.getCpu().limit = 1000;
assert (!needUpdate.get());
-
+
// Minimally test other absolute values to ensure they are included.
lastData.getCpu().usage = 100;
lastData.getCpu().limit = 1000;
@@ -452,10 +452,10 @@ public class ModularLoadManagerImplTest {
currentData.setNumBundles(100);
assert (!needUpdate.get());
}
-
+
/**
* It verifies that deletion of broker-znode on broker-stop will invalidate availableBrokerCache list
- *
+ *
* @throws Exception
*/
@Test
@@ -471,23 +471,23 @@ public class ModularLoadManagerImplTest {
Set<String> avaialbeBrokers = loadManager.getAvailableBrokers();
assertEquals(avaialbeBrokers.size(), 1);
}
-
+
/**
* It verifies namespace-isolation policies with primary and secondary brokers.
- *
+ *
* usecase:
- *
+ *
* <pre>
* 1. Namespace: primary=broker1, secondary=broker2, shared=broker3, min_limit = 1
- * a. available-brokers: broker1, broker2, broker3 => result: broker1
+ * a. available-brokers: broker1, broker2, broker3 => result: broker1
* b. available-brokers: broker2, broker3 => result: broker2
* c. available-brokers: broker3 => result: NULL
* 2. Namespace: primary=broker1, secondary=broker2, shared=broker3, min_limit = 2
- * a. available-brokers: broker1, broker2, broker3 => result: broker1, broker2
+ * a. available-brokers: broker1, broker2, broker3 => result: broker1, broker2
* b. available-brokers: broker2, broker3 => result: broker2
* c. available-brokers: broker3 => result: NULL
* </pre>
- *
+ *
* @throws Exception
*/
@Test
@@ -587,11 +587,11 @@ public class ModularLoadManagerImplTest {
assertEquals(brokerCandidateCache.size(), 0);
}
-
+
/**
* It verifies that pulsar-service fails if load-manager tries to create ephemeral znode for broker which is already
* created by other zk-session-id.
- *
+ *
* @throws Exception
*/
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java
index d06fb08..10488c4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java
@@ -18,14 +18,15 @@
*/
package org.apache.pulsar.broker.loadbalance;
+import static org.testng.Assert.assertEquals;
+
import java.util.Map;
+import java.util.Optional;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageBrokerData;
-import org.apache.pulsar.broker.loadbalance.LoadData;
-import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
import org.apache.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
@@ -49,11 +50,11 @@ public class ModularLoadManagerStrategyTest {
brokerDataMap.put("3", brokerData3);
ServiceConfiguration conf = new ServiceConfiguration();
ModularLoadManagerStrategy strategy = new LeastLongTermMessageRate(conf);
- assert (strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf).equals("1"));
+ assertEquals(strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf), Optional.of("1"));
brokerData1.getTimeAverageData().setLongTermMsgRateIn(400);
- assert (strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf).equals("2"));
+ assertEquals(strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf), Optional.of("2"));
brokerData2.getLocalData().setCpu(new ResourceUsage(90, 100));
- assert (strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf).equals("3"));
+ assertEquals(strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf), Optional.of("3"));
}
private BrokerData initBrokerData() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 7457c87..e26fe34 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -36,6 +36,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -43,7 +44,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.collect.Maps;
import org.apache.bookkeeper.test.PortManager;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.SystemUtils;
@@ -51,13 +51,6 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.cache.ResourceQuotaCache;
-import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
-import org.apache.pulsar.broker.loadbalance.LoadManager;
-import org.apache.pulsar.broker.loadbalance.LoadRanker;
-import org.apache.pulsar.broker.loadbalance.LoadReport;
-import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
-import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
-import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.impl.GenericBrokerHostUsageImpl;
import org.apache.pulsar.broker.loadbalance.impl.LinuxBrokerHostUsageImpl;
import org.apache.pulsar.broker.loadbalance.impl.PulsarLoadReportImpl;
@@ -96,6 +89,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
@@ -233,10 +227,10 @@ public class SimpleLoadManagerImplTest {
sortedRankings.setAccessible(true);
sortedRankings.set(loadManager, sortedRankingsInstance);
- ResourceUnit found = ((SimpleLoadManagerImpl) loadManager)
+ Optional<ResourceUnit> res = ((SimpleLoadManagerImpl) loadManager)
.getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10"));
// broker is not active so found should be null
- assertEquals(found, null, "found a broker when expected none to be found");
+ assertEquals(res, Optional.empty(), "found a broker when expected none to be found");
}
@@ -281,7 +275,7 @@ public class SimpleLoadManagerImplTest {
setObjectField(SimpleLoadManagerImpl.class, loadManager, "sortedRankings", sortedRankingsInstance);
ResourceUnit found = ((SimpleLoadManagerImpl) loadManager)
- .getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10"));
+ .getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10")).get();
// broker is not active so found should be null
assertNotEquals(found, null, "did not find a broker when expected one to be found");
@@ -337,7 +331,7 @@ public class SimpleLoadManagerImplTest {
sortedRankings.set(loadManager, sortedRankingsInstance);
ResourceUnit found = ((SimpleLoadManagerImpl) loadManager)
- .getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10"));
+ .getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10")).get();
assertEquals(found.getResourceId(), ru1.getResourceId());
zkCacheField.set(pulsar1, originalLZK1);
@@ -454,7 +448,7 @@ public class SimpleLoadManagerImplTest {
final SimpleLoadManagerImpl loadManager = (SimpleLoadManagerImpl) pulsar1.getLoadManager().get();
for (final NamespaceBundle bundle : bundles) {
- if (loadManager.getLeastLoaded(bundle).getResourceId().equals(primaryHost)) {
+ if (loadManager.getLeastLoaded(bundle).get().getResourceId().equals(primaryHost)) {
++numAssignedToPrimary;
} else {
++numAssignedToSecondary;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index e065305..458321c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -42,6 +42,7 @@ import java.security.SecureRandom;
import java.security.cert.Certificate;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -64,6 +65,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
@@ -167,7 +169,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
// mock: return Broker2 as a Least-loaded broker when leader receies request [3]
doReturn(true).when(loadManager1).isCentralized();
SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getWebServiceAddress(), null);
- doReturn(resourceUnit).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
+ doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));
/**** started broker-2 ****/
@@ -259,7 +261,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
// mock: return Broker2 as a Least-loaded broker when leader receies request
doReturn(true).when(loadManager2).isCentralized();
SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getWebServiceAddress(), null);
- doReturn(resourceUnit).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
+ doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager2));
/**** started broker-2 ****/
@@ -430,7 +432,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
// request [3]
doReturn(true).when(loadManager1).isCentralized();
SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getWebServiceAddress(), null);
- doReturn(resourceUnit).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
+ doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));
/**** started broker-2 ****/
@@ -827,7 +829,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
// mock: return Broker1 as a Least-loaded broker when leader receies request [3]
doReturn(true).when(loadManager1).isCentralized();
SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddress(), null);
- doReturn(resourceUnit).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
+ doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));
URI broker2ServiceUrl = new URI("pulsar://localhost:" + conf2.getBrokerServicePort());
@@ -935,7 +937,8 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
// mock: return Broker1 as a Least-loaded broker when leader receies request [3]
doReturn(true).when(loadManager1).isCentralized();
SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddress(), null);
- doReturn(resourceUnit).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
+ Optional<ResourceUnit> res = Optional.of(resourceUnit);
+ doReturn(res).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));
URI broker2ServiceUrl = new URI("pulsar://localhost:" + conf2.getBrokerServicePort());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 9f551ed..cebce57 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -606,7 +606,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
LoadManager loadManager = pulsar.getLoadManager().get();
ResourceUnit broker = null;
try {
- broker = loadManager.getLeastLoaded(fdqn);
+ broker = loadManager.getLeastLoaded(fdqn).get();
} catch (Exception e) {
// Ok. (ModulearLoadManagerImpl throws RuntimeException incase don't find broker)
}
@@ -695,7 +695,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
LoadManager loadManager = pulsar.getLoadManager().get();
ResourceUnit broker = null;
try {
- broker = loadManager.getLeastLoaded(fdqn);
+ broker = loadManager.getLeastLoaded(fdqn).get();
} catch (Exception e) {
// Ok. (ModulearLoadManagerImpl throws RuntimeException incase don't find broker)
}
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.