You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/01/30 19:06:40 UTC

[incubator-pulsar] branch master updated: Use Optional for returning least loaded broker (#1122)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d6d4fa3  Use Optional<String> for returning least loaded broker (#1122)
d6d4fa3 is described below

commit d6d4fa3a1103f650bf16e74429f82db3d33fe739
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jan 30 11:06:38 2018 -0800

    Use Optional<String> for returning least loaded broker (#1122)
    
    * Use Optional<String> for returning least loaded broker
    
    * Addressed comments
    
    * Fixed unit tests
    
    * More test fixes
---
 .../pulsar/broker/loadbalance/LoadManager.java     |  3 +-
 .../broker/loadbalance/ModularLoadManager.java     | 14 ++++---
 .../loadbalance/ModularLoadManagerStrategy.java    |  7 ++--
 .../loadbalance/impl/LeastLongTermMessageRate.java | 15 ++++++--
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 44 ++++++++++++----------
 .../impl/ModularLoadManagerWrapper.java            | 14 +++++--
 .../loadbalance/impl/SimpleLoadManagerImpl.java    | 17 +++++----
 .../pulsar/broker/namespace/NamespaceService.java  | 22 +++++------
 .../AntiAffinityNamespaceGroupTest.java            | 38 +++++++++----------
 .../broker/loadbalance/LoadBalancerTest.java       | 12 +++---
 .../loadbalance/ModularLoadManagerImplTest.java    | 32 ++++++++--------
 .../ModularLoadManagerStrategyTest.java            | 11 +++---
 .../loadbalance/SimpleLoadManagerImplTest.java     | 20 ++++------
 .../pulsar/client/api/BrokerServiceLookupTest.java | 13 ++++---
 .../pulsar/client/api/NonPersistentTopicTest.java  |  4 +-
 15 files changed, 143 insertions(+), 123 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
index f9afae4..3a7409e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.loadbalance;
 
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -55,7 +56,7 @@ public interface LoadManager {
     /**
      * Returns the Least Loaded Resource Unit decided by some algorithm or criteria which is implementation specific
      */
-    ResourceUnit getLeastLoaded(ServiceUnitId su) throws Exception;
+    Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception;
 
     /**
      * Generate the load report
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
index e07bc5a..9d5603d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.loadbalance;
 
+import java.util.Optional;
+
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -33,7 +35,7 @@ public interface ModularLoadManager {
 
     /**
      * As any broker, disable the broker this manager is running on.
-     * 
+     *
      * @throws PulsarServerException
      *             If ZooKeeper failed to disable the broker.
      */
@@ -57,16 +59,16 @@ public interface ModularLoadManager {
 
     /**
      * As the leader broker, find a suitable broker for the assignment of the given bundle.
-     * 
+     *
      * @param serviceUnit
      *            ServiceUnitId for the bundle.
      * @return The name of the selected broker, as it appears on ZooKeeper.
      */
-    String selectBrokerForAssignment(ServiceUnitId serviceUnit);
+    Optional<String> selectBrokerForAssignment(ServiceUnitId serviceUnit);
 
     /**
      * As any broker, start the load manager.
-     * 
+     *
      * @throws PulsarServerException
      *             If an unexpected error prevented the load manager from being started.
      */
@@ -74,7 +76,7 @@ public interface ModularLoadManager {
 
     /**
      * As any broker, stop the load manager.
-     * 
+     *
      * @throws PulsarServerException
      *             If an unexpected error occurred when attempting to stop the load manager.
      */
@@ -97,7 +99,7 @@ public interface ModularLoadManager {
 
     /**
      * Return :{@link Deserializer} to deserialize load-manager load report
-     * 
+     *
      * @return
      */
     Deserializer<? extends ServiceLookupData> getLoadReportDeserializer();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategy.java
index 098291b..7a43dc3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategy.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategy.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.loadbalance;
 
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.pulsar.broker.BundleData;
@@ -32,7 +33,7 @@ public interface ModularLoadManagerStrategy {
 
     /**
      * Find a suitable broker to assign the given bundle to.
-     * 
+     *
      * @param candidates
      *            The candidates for which the bundle may be assigned.
      * @param bundleToAssign
@@ -43,12 +44,12 @@ public interface ModularLoadManagerStrategy {
      *            The service configuration.
      * @return The name of the selected broker as it appears on ZooKeeper.
      */
-    String selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+    Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
             ServiceConfiguration conf);
 
     /**
      * Create a placement strategy using the configuration.
-     * 
+     *
      * @param conf
      *            ServiceConfiguration to use.
      * @return A placement strategy from the given configurations.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastLongTermMessageRate.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastLongTermMessageRate.java
index 473f0ca..d14a72c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastLongTermMessageRate.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastLongTermMessageRate.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.loadbalance.impl;
 
 import java.util.ArrayList;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -57,7 +58,7 @@ public class LeastLongTermMessageRate implements ModularLoadManagerStrategy {
             log.warn("Broker {} is overloaded: max usage={}", brokerData.getLocalData().getWebServiceUrl(), maxUsage);
             return Double.POSITIVE_INFINITY;
         }
-        
+
         double totalMessageRate = 0;
         for (BundleData bundleData : brokerData.getPreallocatedBundleData().values()) {
             final TimeAverageMessageData longTermData = bundleData.getLongTermData();
@@ -79,7 +80,7 @@ public class LeastLongTermMessageRate implements ModularLoadManagerStrategy {
 
     /**
      * Find a suitable broker to assign the given bundle to.
-     * 
+     *
      * @param candidates
      *            The candidates for which the bundle may be assigned.
      * @param bundleToAssign
@@ -91,7 +92,7 @@ public class LeastLongTermMessageRate implements ModularLoadManagerStrategy {
      * @return The name of the selected broker as it appears on ZooKeeper.
      */
     @Override
-    public String selectBroker(final Set<String> candidates, final BundleData bundleToAssign, final LoadData loadData,
+    public Optional<String> selectBroker(final Set<String> candidates, final BundleData bundleToAssign, final LoadData loadData,
             final ServiceConfiguration conf) {
         bestBrokers.clear();
         double minScore = Double.POSITIVE_INFINITY;
@@ -125,6 +126,12 @@ public class LeastLongTermMessageRate implements ModularLoadManagerStrategy {
             // Assign randomly in this case.
             bestBrokers.addAll(candidates);
         }
-        return bestBrokers.get(ThreadLocalRandom.current().nextInt(bestBrokers.size()));
+
+        if (bestBrokers.isEmpty()) {
+            // If still, it means there are no available brokers at this point
+            return Optional.empty();
+        }
+
+        return Optional.of(bestBrokers.get(ThreadLocalRandom.current().nextInt(bestBrokers.size())));
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index b8519d8..4b72a10 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -35,7 +35,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -131,7 +130,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
 
     // Strategy to use for splitting bundles.
     private BundleSplitStrategy bundleSplitStrategy;
-    
+
     // Service configuration belonging to the pulsar service.
     private ServiceConfiguration conf;
 
@@ -174,10 +173,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
 
     // ZooKeeper belonging to the pulsar service.
     private ZooKeeper zkClient;
-    
+
     // check if given broker can load persistent/non-persistent topic
     private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;
-    
+
     private Map<String, String> brokerToFailureDomainMap;
 
     private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> jsonMapper()
@@ -197,7 +196,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
         preallocatedBundleToBroker = new ConcurrentHashMap<>();
         scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-modular-load-manager"));
         this.brokerToFailureDomainMap = Maps.newHashMap();
-        
+
         this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
             @Override
             public boolean isEnablePersistentTopics(String brokerUrl) {
@@ -253,7 +252,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
         }
 
         bundleSplitStrategy = new BundleSplitterTask(pulsar);
-        
+
         conf = pulsar.getConfiguration();
 
         // Initialize the default stats to assume for unseen bundles (hard-coded for now).
@@ -273,12 +272,12 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
         localData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
         localData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
 
-        
+
         placementStrategy = ModularLoadManagerStrategy.create(conf);
         policies = new SimpleResourceAllocationPolicies(pulsar);
         zkClient = pulsar.getZkClient();
         filterPipeline.add(new BrokerVersionFilter());
-        
+
         refreshBrokerToFailureDomainMap();
         // register listeners for domain changes
         pulsar.getConfigurationCache().failureDomainListCache()
@@ -591,7 +590,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
                     for (Map.Entry<String, String> entry : bundlesToUnload.entrySet()) {
                         final String broker = entry.getKey();
                         final String bundle = entry.getValue();
-                        
+
                         final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
                         final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
                         if(!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
@@ -640,7 +639,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
      */
     @Override
     public void checkNamespaceBundleSplit() {
-        
+
         if (!conf.isLoadBalancerAutoBundleSplitEnabled() || pulsar.getLeaderElectionService() == null
                 || !pulsar.getLeaderElectionService().isLeader()) {
             return;
@@ -673,7 +672,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
                 }
             }
         }
-    
+
     }
 
     /**
@@ -692,13 +691,13 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
      * @return The name of the selected broker, as it appears on ZooKeeper.
      */
     @Override
-    public String selectBrokerForAssignment(final ServiceUnitId serviceUnit) {
+    public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUnit) {
         // Use brokerCandidateCache as a lock to reduce synchronization.
         synchronized (brokerCandidateCache) {
             final String bundle = serviceUnit.toString();
             if (preallocatedBundleToBroker.containsKey(bundle)) {
                 // If the given bundle is already in preallocated, return the selected broker.
-                return preallocatedBundleToBroker.get(bundle);
+                return Optional.of(preallocatedBundleToBroker.get(bundle));
             }
             final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
                     key -> getBundleDataOrDefault(bundle));
@@ -737,13 +736,18 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
             }
 
             // Choose a broker among the potentially smaller filtered list, when possible
-            String broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
+            Optional<String> broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
             if (log.isDebugEnabled()) {
                 log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache);
             }
 
+            if (!broker.isPresent()) {
+                // No brokers available
+                return broker;
+            }
+
             final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
-            final double maxUsage = loadData.getBrokerData().get(broker).getLocalData().getMaxResourceUsage();
+            final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
             if (maxUsage > overloadThreshold) {
                 // All brokers that were in the filtered list were overloaded, so check if there is a better broker
                 LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
@@ -752,12 +756,12 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
             }
 
             // Add new bundle to preallocated.
-            loadData.getBrokerData().get(broker).getPreallocatedBundleData().put(bundle, data);
-            preallocatedBundleToBroker.put(bundle, broker);
+            loadData.getBrokerData().get(broker.get()).getPreallocatedBundleData().put(bundle, data);
+            preallocatedBundleToBroker.put(bundle, broker.get());
 
             final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
             final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
-            brokerToNamespaceToBundleRange.get(broker).computeIfAbsent(namespaceName, k -> new HashSet<>())
+            brokerToNamespaceToBundleRange.get(broker.get()).computeIfAbsent(namespaceName, k -> new HashSet<>())
                     .add(bundleRange);
             return broker;
         }
@@ -827,7 +831,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
 
     /**
      * As any broker, retrieve the namespace bundle stats and system resource usage to update data local to this broker.
-     * @return 
+     * @return
      */
     @Override
     public LocalBrokerData updateLocalBrokerData() {
@@ -902,7 +906,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
             }
         }
     }
-    
+
     private void deleteBundleDataFromZookeeper(String bundle) {
         final String zooKeeperPath = getBundleDataZooKeeperPath(bundle);
         try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index ed91991..54b9d56 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.loadbalance.impl;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -63,9 +64,14 @@ public class ModularLoadManagerWrapper implements LoadManager {
     }
 
     @Override
-    public ResourceUnit getLeastLoaded(final ServiceUnitId serviceUnit) {
-        return new SimpleResourceUnit(String.format("http://%s", loadManager.selectBrokerForAssignment(serviceUnit)),
-                new PulsarResourceDescription());
+    public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId serviceUnit) {
+        Optional<String> leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit);
+        if (leastLoadedBroker.isPresent()) {
+            return Optional.of(new SimpleResourceUnit(String.format("http://%s", leastLoadedBroker.get()),
+                    new PulsarResourceDescription()));
+        } else {
+            return Optional.empty();
+        }
     }
 
     @Override
@@ -112,7 +118,7 @@ public class ModularLoadManagerWrapper implements LoadManager {
     public Deserializer<? extends ServiceLookupData> getLoadReportDeserializer() {
         return loadManager.getLoadReportDeserializer();
     }
-    
+
     public ModularLoadManager getLoadManager() {
         return loadManager;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index bdf0345..bbaf8bb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.loadbalance.impl;
 
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
 import static org.apache.pulsar.broker.admin.AdminResource.jsonMapper;
+import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL;
 
 import java.io.IOException;
 import java.time.LocalDateTime;
@@ -30,6 +31,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Executors;
@@ -47,7 +49,6 @@ import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.loadbalance.PlacementStrategy;
 import org.apache.pulsar.broker.loadbalance.ResourceUnit;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
-import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
@@ -228,7 +229,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
                 pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
         lastLoadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
         lastLoadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
-        
+
         loadReportCacheZk = new ZooKeeperDataCache<LoadReport>(pulsar.getLocalZkCache()) {
             @Override
             public LoadReport deserialize(String key, byte[] content) throws Exception {
@@ -940,8 +941,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
         }
     }
 
-    public ResourceUnit getLeastLoaded(ServiceUnitId serviceUnit) throws Exception {
-        return getLeastLoadedBroker(serviceUnit, getAvailableBrokers(serviceUnit));
+    public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId serviceUnit) throws Exception {
+        return Optional.ofNullable(getLeastLoadedBroker(serviceUnit, getAvailableBrokers(serviceUnit)));
     }
 
     public Multimap<Long, ResourceUnit> getResourceAvailabilityFor(ServiceUnitId serviceUnitId) throws Exception {
@@ -1098,8 +1099,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
         }
         return generateLoadReportForcefully();
     }
-    
-    private LoadReport generateLoadReportForcefully() throws Exception {    
+
+    private LoadReport generateLoadReportForcefully() throws Exception {
         synchronized (bundleGainsCache) {
             try {
                 LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
@@ -1272,7 +1273,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
 
     /**
      * Check if last generated load-report time passed the minimum time for load-report update.
-     * 
+     *
      * @return true: if last load-report generation passed the minimum interval and load-report can be generated false:
      *         if load-report generation has not passed minimum interval to update load-report again
      */
@@ -1476,7 +1477,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
         availableActiveBrokers.close();
         scheduler.shutdown();
     }
-    
+
     private long getBrokerZnodeOwner() {
         try {
             Stat stat = new Stat();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 6602766..e7b885a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -460,20 +460,20 @@ public class NamespaceService {
      * @throws Exception
      */
     private Optional<String> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
-        ResourceUnit leastLoadedBroker = loadManager.get().getLeastLoaded(serviceUnit);
-        if (leastLoadedBroker != null) {
-            String lookupAddress = leastLoadedBroker.getResourceId();
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("{} : redirecting to the least loaded broker, lookup address={}",
-                        pulsar.getWebServiceAddress(), lookupAddress);
-            }
-            return Optional.of(lookupAddress);
-        } else {
+        Optional<ResourceUnit> leastLoadedBroker = loadManager.get().getLeastLoaded(serviceUnit);
+        if (!leastLoadedBroker.isPresent()) {
             LOG.warn("No broker is available for {}", serviceUnit);
             return Optional.empty();
         }
+
+        String lookupAddress = leastLoadedBroker.get().getResourceId();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{} : redirecting to the least loaded broker, lookup address={}", pulsar.getWebServiceAddress(),
+                    lookupAddress);
+        }
+        return Optional.of(lookupAddress);
     }
-    
+
     public void unloadNamespaceBundle(NamespaceBundle bundle) throws Exception {
         unloadNamespaceBundle(bundle, 5, TimeUnit.MINUTES);
     }
@@ -607,7 +607,7 @@ public class NamespaceService {
             String msg = format("bundle %s not found under namespace", bundle.toString());
             unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
         }
-        
+
         return unloadFuture.thenApply(res -> {
             if (!unload) {
                 return null;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index ec0bf1f..f8b584a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -185,27 +185,27 @@ public class AntiAffinityNamespaceGroupTest {
     }
 
     /**
-     * 
+     *
      * It verifies anti-affinity-namespace assignment with failure-domain
-     * 
+     *
      * <pre>
      * Domain     Brokers-count
      * ________  ____________
      * domain-0   broker-0,broker-1
      * domain-1   broker-2,broker-3
-     * 
+     *
      * Anti-affinity-namespace assignment
-     * 
+     *
      * (1) ns0 -> candidate-brokers: b0, b1, b2, b3 => selected b0
      * (2) ns1 -> candidate-brokers: b2, b3         => selected b2
      * (3) ns2 -> candidate-brokers: b1, b3         => selected b1
      * (4) ns3 -> candidate-brokers: b3             => selected b3
      * (5) ns4 -> candidate-brokers: b0, b1, b2, b3 => selected b0
-     * 
+     *
      * "candidate" broker to own anti-affinity-namespace = b2 or b4
-     * 
+     *
      * </pre>
-     * 
+     *
      */
     @Test
     public void testAntiAffinityNamespaceFilteringWithDomain() throws Exception {
@@ -292,17 +292,17 @@ public class AntiAffinityNamespaceGroupTest {
 
     /**
      * It verifies anti-affinity-namespace assignment without failure-domain enabled
-     * 
+     *
      * <pre>
      *  Anti-affinity-namespace assignment
-     * 
+     *
      * (1) ns0 -> candidate-brokers: b0, b1, b2     => selected b0
      * (2) ns1 -> candidate-brokers: b1, b2         => selected b1
      * (3) ns2 -> candidate-brokers: b2             => selected b2
      * (5) ns3 -> candidate-brokers: b0, b1, b2     => selected b0
      * </pre>
-     * 
-     * 
+     *
+     *
      * @throws Exception
      */
     @Test
@@ -377,15 +377,15 @@ public class AntiAffinityNamespaceGroupTest {
 
     /**
      * It verifies anti-affinity with failure domain enabled with 2 brokers.
-     * 
+     *
      * <pre>
      * 1. Register brokers to domain: domain-1: broker1 & domain-2: broker2
      * 2. Load-Manager receives a watch and updates brokerToDomain cache with new domain data
      * 3. Create two namespace with anti-affinity
      * 4. Load-manager selects broker for each namespace such that from different domains
-     * 
+     *
      * </pre>
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -420,10 +420,10 @@ public class AntiAffinityNamespaceGroupTest {
         assertTrue(isLoadManagerUpdatedDomainCache(secondaryLoadManager));
 
         ServiceUnitId serviceUnit = makeBundle(property, cluster, "ns1");
-        String selectedBroker1 = primaryLoadManager.selectBrokerForAssignment(serviceUnit);
+        String selectedBroker1 = primaryLoadManager.selectBrokerForAssignment(serviceUnit).get();
 
         serviceUnit = makeBundle(property, cluster, "ns2");
-        String selectedBroker2 = primaryLoadManager.selectBrokerForAssignment(serviceUnit);
+        String selectedBroker2 = primaryLoadManager.selectBrokerForAssignment(serviceUnit).get();
 
         assertNotEquals(selectedBroker1, selectedBroker2);
 
@@ -432,13 +432,13 @@ public class AntiAffinityNamespaceGroupTest {
     /**
      * It verifies that load-shedding task should unload namespace only if there is a broker available which doesn't
      * cause uneven anti-affinitiy namespace distribution.
-     * 
+     *
      * <pre>
      * 1. broker1 owns ns-0 => broker1 can unload ns-0
      * 1. broker2 owns ns-1 => broker1 can unload ns-0
      * 1. broker3 owns ns-2 => broker1 can't unload ns-0 as all brokers have same no NS
      * </pre>
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -491,7 +491,7 @@ public class AntiAffinityNamespaceGroupTest {
     /**
      * It verifies that load-manager::shouldAntiAffinityNamespaceUnload checks that unloading should only happen if all
      * brokers have same number of anti-affinity namespaces
-     * 
+     *
      * @throws Exception
      */
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index a877013..1cf6822 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -225,7 +225,7 @@ public class LoadBalancerTest {
                 assertEquals(brokerCount, BROKER_COUNT);
                 DestinationName fqdn = DestinationName.get("persistent://pulsar/use/primary-ns/test-topic");
                 ResourceUnit found = pulsarServices[i].getLoadManager().get()
-                        .getLeastLoaded(pulsarServices[i].getNamespaceService().getBundle(fqdn));
+                        .getLeastLoaded(pulsarServices[i].getNamespaceService().getBundle(fqdn)).get();
                 assertTrue(found != null);
             }
         } catch (InterruptedException | KeeperException e) {
@@ -264,7 +264,7 @@ public class LoadBalancerTest {
         for (int i = 0; i < totalNamespaces; i++) {
             DestinationName fqdn = DestinationName.get("persistent://pulsar/use/primary-ns-" + i + "/test-topic");
             ResourceUnit found = pulsarServices[0].getLoadManager().get()
-                    .getLeastLoaded(pulsarServices[0].getNamespaceService().getBundle(fqdn));
+                    .getLeastLoaded(pulsarServices[0].getNamespaceService().getBundle(fqdn)).get();
             if (namespaceOwner.containsKey(found.getResourceId())) {
                 namespaceOwner.put(found.getResourceId(), namespaceOwner.get(found.getResourceId()) + 1);
             } else {
@@ -404,7 +404,7 @@ public class LoadBalancerTest {
         for (int i = 0; i < totalNamespaces; i++) {
             DestinationName fqdn = DestinationName.get("persistent://pulsar/use/primary-ns-" + i + "/test-topic");
             ResourceUnit found = pulsarServices[0].getLoadManager().get()
-                    .getLeastLoaded(pulsarServices[0].getNamespaceService().getBundle(fqdn));
+                    .getLeastLoaded(pulsarServices[0].getNamespaceService().getBundle(fqdn)).get();
             if (namespaceOwner.containsKey(found.getResourceId())) {
                 namespaceOwner.put(found.getResourceId(), namespaceOwner.get(found.getResourceId()) + 1);
             } else {
@@ -831,7 +831,7 @@ public class LoadBalancerTest {
         sortedRankings.set(loadManager, sortedRankingsInstance);
 
         ResourceUnit found = ((SimpleLoadManagerImpl) loadManager)
-                .getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10"));
+                .getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10")).get();
         assertEquals("http://prod1-broker1.messaging.use.example.com:8080", found.getResourceId());
 
         zkCacheField.set(pulsarServices[0], originalLZK1);
@@ -951,7 +951,7 @@ public class LoadBalancerTest {
         Map<String, Integer> namespaceOwner = new HashMap<String, Integer>();
         for (int i = 0; i < totalNamespaces; i++) {
             ResourceUnit found = loadManager
-                    .getLeastLoaded(DestinationName.get("persistent://pulsar/use/primary-ns/topic" + i));
+                    .getLeastLoaded(DestinationName.get("persistent://pulsar/use/primary-ns/topic" + i)).get();
             if (namespaceOwner.containsKey(found.getResourceId())) {
                 namespaceOwner.put(found.getResourceId(), namespaceOwner.get(found.getResourceId()) + 1);
             } else {
@@ -1012,7 +1012,7 @@ public class LoadBalancerTest {
         Map<String, Integer> namespaceOwner = new HashMap<String, Integer>();
         for (int i = 0; i < totalNamespaces; i++) {
             ResourceUnit found = loadManager
-                    .getLeastLoaded(DestinationName.get("persistent://pulsar/use/primary-ns/topic-" + i));
+                    .getLeastLoaded(DestinationName.get("persistent://pulsar/use/primary-ns/topic-" + i)).get();
             if (namespaceOwner.containsKey(found.getResourceId())) {
                 namespaceOwner.put(found.getResourceId(), namespaceOwner.get(found.getResourceId()) + 1);
             } else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index fce825c..7d36d39 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -225,7 +225,7 @@ public class ModularLoadManagerImplTest {
         // After 2 selections, the load balancer should select both brokers due to preallocation.
         for (int i = 0; i < 2; ++i) {
             final ServiceUnitId serviceUnit = makeBundle(Integer.toString(i));
-            final String broker = primaryLoadManager.selectBrokerForAssignment(serviceUnit);
+            final String broker = primaryLoadManager.selectBrokerForAssignment(serviceUnit).get();
             if (broker.equals(primaryHost)) {
                 foundFirst = true;
             } else {
@@ -289,13 +289,13 @@ public class ModularLoadManagerImplTest {
     /**
      * It verifies that once broker owns max-number of topics: load-manager doesn't allocates new bundles to that broker
      * unless all the brokers are in same state.
-     * 
+     *
      * <pre>
      * 1. Create a bundle whose bundle-resource-quota will contain max-topics
      * 2. Load-manager assigns broker to this bundle so, assigned broker is overloaded with max-topics
      * 3. For any new further bundles: broker assigns different brokers.
      * </pre>
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -313,13 +313,13 @@ public class ModularLoadManagerImplTest {
         final String firstBundleDataPath = String.format("%s/%s", ModularLoadManagerImpl.BUNDLE_DATA_ZPATH, bundles[0]);
         ZkUtils.createFullPathOptimistic(pulsar1.getZkClient(), firstBundleDataPath, bundleData.getJsonBytes(),
                 ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        String maxTopicOwnedBroker = primaryLoadManager.selectBrokerForAssignment(bundles[0]);
+        String maxTopicOwnedBroker = primaryLoadManager.selectBrokerForAssignment(bundles[0]).get();
 
         for (int i = 1; i < totalBundles; i++) {
             assertNotEquals(primaryLoadManager.selectBrokerForAssignment(bundles[i]), maxTopicOwnedBroker);
         }
     }
-    
+
     // Test that load shedding works
     @Test
     public void testLoadShedding() throws Exception {
@@ -429,7 +429,7 @@ public class ModularLoadManagerImplTest {
         currentData.getCpu().usage = 106;
         currentData.getCpu().limit = 1000;
         assert (!needUpdate.get());
-        
+
         // Minimally test other absolute values to ensure they are included.
         lastData.getCpu().usage = 100;
         lastData.getCpu().limit = 1000;
@@ -452,10 +452,10 @@ public class ModularLoadManagerImplTest {
         currentData.setNumBundles(100);
         assert (!needUpdate.get());
     }
-    
+
     /**
      * It verifies that deletion of broker-znode on broker-stop will invalidate availableBrokerCache list
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -471,23 +471,23 @@ public class ModularLoadManagerImplTest {
         Set<String> avaialbeBrokers = loadManager.getAvailableBrokers();
         assertEquals(avaialbeBrokers.size(), 1);
     }
-    
+
     /**
      * It verifies namespace-isolation policies with primary and secondary brokers.
-     * 
+     *
      * usecase:
-     * 
+     *
      * <pre>
      *  1. Namespace: primary=broker1, secondary=broker2, shared=broker3, min_limit = 1
-     *     a. available-brokers: broker1, broker2, broker3 => result: broker1 
+     *     a. available-brokers: broker1, broker2, broker3 => result: broker1
      *     b. available-brokers: broker2, broker3          => result: broker2
      *     c. available-brokers: broker3                   => result: NULL
      *  2. Namespace: primary=broker1, secondary=broker2, shared=broker3, min_limit = 2
-     *     a. available-brokers: broker1, broker2, broker3 => result: broker1, broker2 
+     *     a. available-brokers: broker1, broker2, broker3 => result: broker1, broker2
      *     b. available-brokers: broker2, broker3          => result: broker2
      *     c. available-brokers: broker3                   => result: NULL
      * </pre>
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -587,11 +587,11 @@ public class ModularLoadManagerImplTest {
         assertEquals(brokerCandidateCache.size(), 0);
 
     }
-    
+
     /**
      * It verifies that pulsar-service fails if load-manager tries to create ephemeral znode for broker which is already
      * created by other zk-session-id.
-     * 
+     *
      * @throws Exception
      */
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java
index d06fb08..10488c4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java
@@ -18,14 +18,15 @@
  */
 package org.apache.pulsar.broker.loadbalance;
 
+import static org.testng.Assert.assertEquals;
+
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.pulsar.broker.BrokerData;
 import org.apache.pulsar.broker.BundleData;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.TimeAverageBrokerData;
-import org.apache.pulsar.broker.loadbalance.LoadData;
-import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
 import org.apache.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
@@ -49,11 +50,11 @@ public class ModularLoadManagerStrategyTest {
         brokerDataMap.put("3", brokerData3);
         ServiceConfiguration conf = new ServiceConfiguration();
         ModularLoadManagerStrategy strategy = new LeastLongTermMessageRate(conf);
-        assert (strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf).equals("1"));
+        assertEquals(strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf), Optional.of("1"));
         brokerData1.getTimeAverageData().setLongTermMsgRateIn(400);
-        assert (strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf).equals("2"));
+        assertEquals(strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf), Optional.of("2"));
         brokerData2.getLocalData().setCpu(new ResourceUsage(90, 100));
-        assert (strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf).equals("3"));
+        assertEquals(strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf), Optional.of("3"));
     }
 
     private BrokerData initBrokerData() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 7457c87..e26fe34 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -36,6 +36,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -43,7 +44,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.collect.Maps;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.lang3.SystemUtils;
@@ -51,13 +51,6 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.cache.ResourceQuotaCache;
-import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
-import org.apache.pulsar.broker.loadbalance.LoadManager;
-import org.apache.pulsar.broker.loadbalance.LoadRanker;
-import org.apache.pulsar.broker.loadbalance.LoadReport;
-import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
-import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
-import org.apache.pulsar.broker.loadbalance.ResourceUnit;
 import org.apache.pulsar.broker.loadbalance.impl.GenericBrokerHostUsageImpl;
 import org.apache.pulsar.broker.loadbalance.impl.LinuxBrokerHostUsageImpl;
 import org.apache.pulsar.broker.loadbalance.impl.PulsarLoadReportImpl;
@@ -96,6 +89,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 /**
@@ -233,10 +227,10 @@ public class SimpleLoadManagerImplTest {
         sortedRankings.setAccessible(true);
         sortedRankings.set(loadManager, sortedRankingsInstance);
 
-        ResourceUnit found = ((SimpleLoadManagerImpl) loadManager)
+        Optional<ResourceUnit> res = ((SimpleLoadManagerImpl) loadManager)
                 .getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10"));
         // broker is not active so found should be null
-        assertEquals(found, null, "found a broker when expected none to be found");
+        assertEquals(res, Optional.empty(), "found a broker when expected none to be found");
 
     }
 
@@ -281,7 +275,7 @@ public class SimpleLoadManagerImplTest {
         setObjectField(SimpleLoadManagerImpl.class, loadManager, "sortedRankings", sortedRankingsInstance);
 
         ResourceUnit found = ((SimpleLoadManagerImpl) loadManager)
-                .getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10"));
+                .getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10")).get();
         // broker is not active so found should be null
         assertNotEquals(found, null, "did not find a broker when expected one to be found");
 
@@ -337,7 +331,7 @@ public class SimpleLoadManagerImplTest {
         sortedRankings.set(loadManager, sortedRankingsInstance);
 
         ResourceUnit found = ((SimpleLoadManagerImpl) loadManager)
-                .getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10"));
+                .getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10")).get();
         assertEquals(found.getResourceId(), ru1.getResourceId());
 
         zkCacheField.set(pulsar1, originalLZK1);
@@ -454,7 +448,7 @@ public class SimpleLoadManagerImplTest {
         final SimpleLoadManagerImpl loadManager = (SimpleLoadManagerImpl) pulsar1.getLoadManager().get();
 
         for (final NamespaceBundle bundle : bundles) {
-            if (loadManager.getLeastLoaded(bundle).getResourceId().equals(primaryHost)) {
+            if (loadManager.getLeastLoaded(bundle).get().getResourceId().equals(primaryHost)) {
                 ++numAssignedToPrimary;
             } else {
                 ++numAssignedToSecondary;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index e065305..458321c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -42,6 +42,7 @@ import java.security.SecureRandom;
 import java.security.cert.Certificate;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -64,6 +65,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
@@ -167,7 +169,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         // mock: return Broker2 as a Least-loaded broker when leader receies request [3]
         doReturn(true).when(loadManager1).isCentralized();
         SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getWebServiceAddress(), null);
-        doReturn(resourceUnit).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
+        doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
         loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));
 
         /**** started broker-2 ****/
@@ -259,7 +261,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         // mock: return Broker2 as a Least-loaded broker when leader receies request
         doReturn(true).when(loadManager2).isCentralized();
         SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getWebServiceAddress(), null);
-        doReturn(resourceUnit).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
+        doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
         loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager2));
         /**** started broker-2 ****/
 
@@ -430,7 +432,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
 		// request [3]
 		doReturn(true).when(loadManager1).isCentralized();
 		SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getWebServiceAddress(), null);
-		doReturn(resourceUnit).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
+		doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
 		loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));
 
 		/**** started broker-2 ****/
@@ -827,7 +829,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         // mock: return Broker1 as a Least-loaded broker when leader receies request [3]
         doReturn(true).when(loadManager1).isCentralized();
         SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddress(), null);
-        doReturn(resourceUnit).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
+        doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
         loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));
 
         URI broker2ServiceUrl = new URI("pulsar://localhost:" + conf2.getBrokerServicePort());
@@ -935,7 +937,8 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
             // mock: return Broker1 as a Least-loaded broker when leader receies request [3]
             doReturn(true).when(loadManager1).isCentralized();
             SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddress(), null);
-            doReturn(resourceUnit).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
+            Optional<ResourceUnit> res = Optional.of(resourceUnit);
+            doReturn(res).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
             loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));
 
             URI broker2ServiceUrl = new URI("pulsar://localhost:" + conf2.getBrokerServicePort());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 9f551ed..cebce57 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -606,7 +606,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
             LoadManager loadManager = pulsar.getLoadManager().get();
             ResourceUnit broker = null;
             try {
-                broker = loadManager.getLeastLoaded(fdqn);
+                broker = loadManager.getLeastLoaded(fdqn).get();
             } catch (Exception e) {
                 // Ok. (ModulearLoadManagerImpl throws RuntimeException incase don't find broker)
             }
@@ -695,7 +695,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
             LoadManager loadManager = pulsar.getLoadManager().get();
             ResourceUnit broker = null;
             try {
-                broker = loadManager.getLeastLoaded(fdqn);
+                broker = loadManager.getLeastLoaded(fdqn).get();
             } catch (Exception e) {
                 // Ok. (ModulearLoadManagerImpl throws RuntimeException incase don't find broker)
             }

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.