You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/01/21 20:49:07 UTC

(pulsar) 01/02: [fix][broker] Fix leader broker cannot be determined when the advertised address and advertised listeners are configured (#21894)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 940ede5ea635c9a51cd877cdb6b2d0ee07a651b4
Author: Lari Hotari <lh...@apache.org>
AuthorDate: Sun Jan 21 22:44:23 2024 +0200

    [fix][broker] Fix leader broker cannot be determined when the advertised address and advertised listeners are configured (#21894)
    
    (cherry picked from commit 3158fd3550f9e3a0b2c0316c92265318b209f4f5)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  23 +-
 .../pulsar/broker/admin/impl/BrokersBase.java      |  12 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  11 +-
 .../pulsar/broker/loadbalance/LeaderBroker.java    |  18 ++
 .../broker/loadbalance/LeaderElectionService.java  |  12 +-
 .../pulsar/broker/loadbalance/NoopLoadManager.java |  14 +-
 .../pulsar/broker/loadbalance/ResourceUnit.java    |   2 -
 .../loadbalance/extensions/BrokerRegistryImpl.java |   2 +-
 .../extensions/ExtensibleLoadManagerImpl.java      |  15 +-
 .../channel/ServiceUnitStateChannelImpl.java       |  38 +--
 .../policies/IsolationPoliciesHelper.java          |   8 +-
 .../reporter/BrokerLoadDataReporter.java           |  14 +-
 .../reporter/TopBundleLoadDataReporter.java        |  14 +-
 .../broker/loadbalance/impl/LoadManagerShared.java |  99 +++----
 .../loadbalance/impl/ModularLoadManagerImpl.java   |  15 +-
 .../impl/ModularLoadManagerWrapper.java            |  21 +-
 .../loadbalance/impl/SimpleLoadManagerImpl.java    |  42 ++-
 .../pulsar/broker/namespace/NamespaceService.java  |  75 ++---
 .../pulsar/broker/service/BrokerService.java       |   2 +-
 .../service/nonpersistent/NonPersistentTopic.java  |   2 +-
 .../broker/service/persistent/PersistentTopic.java |   2 +-
 .../pulsar/broker/web/PulsarWebResource.java       |  54 ++--
 .../apache/pulsar/broker/SLAMonitoringTest.java    |  16 +-
 .../broker/admin/AdminApiMultiBrokersTest.java     |   5 +-
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  22 +-
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   3 +-
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java    |   2 +-
 ...isedListenersMultiBrokerLeaderElectionTest.java |  42 +++
 .../loadbalance/LeaderElectionServiceTest.java     |   3 +-
 .../broker/loadbalance/LoadBalancerTest.java       |   8 +-
 .../loadbalance/MultiBrokerLeaderElectionTest.java |  94 +++++--
 .../loadbalance/SimpleLoadManagerImplTest.java     |  34 +--
 .../loadbalance/extensions/BrokerRegistryTest.java |   4 +-
 .../extensions/ExtensibleLoadManagerImplTest.java  |  52 ++--
 .../channel/ServiceUnitStateChannelTest.java       | 312 ++++++++++-----------
 .../filter/BrokerIsolationPoliciesFilterTest.java  |  64 ++---
 .../extensions/scheduler/TransferShedderTest.java  | 296 +++++++++----------
 .../impl/ModularLoadManagerImplTest.java           | 130 ++++-----
 .../broker/namespace/NamespaceServiceTest.java     |  39 ++-
 .../pulsar/broker/service/BrokerServiceTest.java   |  10 +-
 .../broker/service/InactiveTopicDeleteTest.java    |   4 +-
 .../systopic/PartitionedSystemTopicTest.java       |  12 +-
 .../broker/testcontext/PulsarTestContext.java      |  45 ++-
 .../pulsar/client/api/BrokerServiceLookupTest.java |  10 +-
 .../apache/pulsar/compaction/CompactionTest.java   |   4 +-
 .../pulsar/common/policies/data/BrokerInfo.java    |   2 +
 .../common/policies/data/impl/BrokerInfoImpl.java  |   9 +-
 .../server/ProxyWithExtensibleLoadManagerTest.java |   2 +-
 48 files changed, 941 insertions(+), 778 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index a04a4c137cc..42d43b3dcf2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -275,6 +275,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     private TransactionPendingAckStoreProvider transactionPendingAckStoreProvider;
     private final ExecutorProvider transactionExecutorProvider;
     private final DefaultMonotonicSnapshotClock monotonicSnapshotClock;
+    private String brokerId;
 
     public enum State {
         Init, Started, Closing, Closed
@@ -307,6 +308,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
         // Validate correctness of configuration
         PulsarConfigurationLoader.isComplete(config);
         TransactionBatchedWriteValidator.validate(config);
+        this.config = config;
 
         // validate `advertisedAddress`, `advertisedListeners`, `internalListenerName`
         this.advertisedListeners = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
@@ -317,7 +319,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {
         // use `internalListenerName` listener as `advertisedAddress`
         this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getBindAddress());
         this.brokerVersion = PulsarVersion.getVersion();
-        this.config = config;
         this.processTerminator = processTerminator;
         this.loadManagerExecutor = Executors
                 .newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager"));
@@ -828,6 +829,12 @@ public class PulsarService implements AutoCloseable, ShutdownService {
             this.brokerServiceUrl = brokerUrl(config);
             this.brokerServiceUrlTls = brokerUrlTls(config);
 
+            // the broker id is used in the load manager to identify the broker
+            this.brokerId =
+                    String.format("%s:%s", advertisedAddress, config.getWebServicePortTls().isPresent()
+                            ? config.getWebServicePortTls().get()
+                            : config.getWebServicePort().orElseThrow());
+
             if (this.compactionServiceFactory == null) {
                 this.compactionServiceFactory = loadCompactionServiceFactory();
             }
@@ -1099,7 +1106,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     }
 
     private void handleDeleteCluster(Notification notification) {
-        if (ClusterResources.pathRepresentsClusterName(notification.getPath())
+        if (isRunning() && ClusterResources.pathRepresentsClusterName(notification.getPath())
                 && notification.getType() == NotificationType.Deleted) {
             final String clusterName = ClusterResources.clusterNameFromPath(notification.getPath());
             getBrokerService().closeAndRemoveReplicationClient(clusterName);
@@ -1137,7 +1144,8 @@ public class PulsarService implements AutoCloseable, ShutdownService {
             LOG.info("The load manager extension is enabled. Skipping PulsarService LeaderElectionService.");
             return;
         }
-        this.leaderElectionService = new LeaderElectionService(coordinationService, getSafeWebServiceAddress(),
+        this.leaderElectionService =
+                new LeaderElectionService(coordinationService, getBrokerId(), getSafeWebServiceAddress(),
                 state -> {
                     if (state == LeaderElectionState.Leading) {
                         LOG.info("This broker was elected leader");
@@ -1185,7 +1193,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     protected void acquireSLANamespace() {
         try {
             // Namespace not created hence no need to unload it
-            NamespaceName nsName = NamespaceService.getSLAMonitorNamespace(getLookupServiceAddress(), config);
+            NamespaceName nsName = NamespaceService.getSLAMonitorNamespace(getBrokerId(), config);
             if (!this.pulsarResources.getNamespaceResources().namespaceExists(nsName)) {
                 LOG.info("SLA Namespace = {} doesn't exist.", nsName);
                 return;
@@ -1694,10 +1702,9 @@ public class PulsarService implements AutoCloseable, ShutdownService {
         return brokerServiceUrlTls != null ? brokerServiceUrlTls : brokerServiceUrl;
     }
 
-    public String getLookupServiceAddress() {
-        return String.format("%s:%s", advertisedAddress, config.getWebServicePortTls().isPresent()
-                ? config.getWebServicePortTls().get()
-                : config.getWebServicePort().orElseThrow());
+    public String getBrokerId() {
+        return Objects.requireNonNull(brokerId,
+                "brokerId is not initialized before start has been called");
     }
 
     public synchronized void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 3fb1941b33a..ad3d7e789e4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -141,7 +141,9 @@ public class BrokersBase extends AdminResource {
         validateSuperUserAccessAsync().thenAccept(__ -> {
                     LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader()
                             .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find leader broker"));
-                    BrokerInfo brokerInfo = BrokerInfo.builder().serviceUrl(leaderBroker.getServiceUrl()).build();
+                    BrokerInfo brokerInfo = BrokerInfo.builder()
+                            .serviceUrl(leaderBroker.getServiceUrl())
+                            .brokerId(leaderBroker.getBrokerId()).build();
                     LOG.info("[{}] Successfully to get the information of the leader broker.", clientAppId());
                     asyncResponse.resume(brokerInfo);
                 })
@@ -164,7 +166,7 @@ public class BrokersBase extends AdminResource {
                                    @PathParam("clusterName") String cluster,
                                    @PathParam("broker-webserviceurl") String broker) {
         validateSuperUserAccessAsync()
-                .thenAccept(__ -> validateBrokerName(broker))
+                .thenCompose(__ -> maybeRedirectToBroker(broker))
                 .thenCompose(__ -> validateClusterOwnershipAsync(cluster))
                 .thenCompose(__ -> pulsar().getNamespaceService().getOwnedNameSpacesStatusAsync())
                 .thenAccept(asyncResponse::resume)
@@ -396,10 +398,10 @@ public class BrokersBase extends AdminResource {
 
 
     private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion) {
-        String lookupServiceAddress = pulsar().getLookupServiceAddress();
+        String brokerId = pulsar().getBrokerId();
         NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
-                ? NamespaceService.getHeartbeatNamespaceV2(lookupServiceAddress, pulsar().getConfiguration())
-                : NamespaceService.getHeartbeatNamespace(lookupServiceAddress, pulsar().getConfiguration());
+                ? NamespaceService.getHeartbeatNamespaceV2(brokerId, pulsar().getConfiguration())
+                : NamespaceService.getHeartbeatNamespace(brokerId, pulsar().getConfiguration());
         final String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
         LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), topicName);
         final String messageStr = UUID.randomUUID().toString();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index caaff010439..f274cffa46b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -923,9 +923,9 @@ public abstract class NamespacesBase extends AdminResource {
             return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, errorStr));
         }
         LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get();
-        String leaderBrokerUrl = leaderBroker.getServiceUrl();
+        String leaderBrokerId = leaderBroker.getBrokerId();
         return pulsar().getNamespaceService()
-                .createLookupResult(leaderBrokerUrl, false, null)
+                .createLookupResult(leaderBrokerId, false, null)
                 .thenCompose(lookupResult -> {
                     String redirectUrl = isRequestHttps() ? lookupResult.getLookupData().getHttpUrlTls()
                             : lookupResult.getLookupData().getHttpUrl();
@@ -948,7 +948,7 @@ public abstract class NamespacesBase extends AdminResource {
                         return FutureUtil.failedFuture((
                                 new WebApplicationException(Response.temporaryRedirect(redirect).build())));
                     } catch (MalformedURLException exception) {
-                        log.error("The leader broker url is malformed - {}", leaderBrokerUrl);
+                        log.error("The redirect url is malformed - {}", redirectUrl);
                         return FutureUtil.failedFuture(new RestException(exception));
                     }
                 });
@@ -984,8 +984,11 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String bundleRange,
-                                                                      String destinationBroker,
+                                                                      String destinationBrokerParam,
                                                                       boolean authoritative) {
+        String destinationBroker = StringUtils.isBlank(destinationBrokerParam) ? null :
+                // ensure backward compatibility: strip the possible http:// or https:// prefix
+                destinationBrokerParam.replaceFirst("http[s]?://", "");
         return validateSuperUserAccessAsync()
                 .thenCompose(__ -> setNamespaceBundleAffinityAsync(bundleRange, destinationBroker))
                 .thenAccept(__ -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderBroker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderBroker.java
index acd34e151ed..d7c21de5ea1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderBroker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderBroker.java
@@ -30,5 +30,23 @@ import lombok.NoArgsConstructor;
 @AllArgsConstructor
 @NoArgsConstructor
 public class LeaderBroker {
+    private String brokerId;
     private String serviceUrl;
+
+    public String getBrokerId() {
+        if (brokerId != null) {
+            return brokerId;
+        } else {
+            // for backward compatibility at runtime with older versions of Pulsar
+            return parseHostAndPort(serviceUrl);
+        }
+    }
+
+    private static String parseHostAndPort(String serviceUrl) {
+        int uriSeparatorPos = serviceUrl.indexOf("://");
+        if (uriSeparatorPos == -1) {
+            throw new IllegalArgumentException("'" + serviceUrl + "' isn't an URI.");
+        }
+        return serviceUrl.substring(uriSeparatorPos + 3);
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
index 05fe4353f3e..2e53b54e98f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
@@ -35,17 +35,17 @@ public class LeaderElectionService implements AutoCloseable {
     private final LeaderElection<LeaderBroker> leaderElection;
     private final LeaderBroker localValue;
 
-    public LeaderElectionService(CoordinationService cs, String localWebServiceAddress,
-            Consumer<LeaderElectionState> listener) {
-        this(cs, localWebServiceAddress, ELECTION_ROOT, listener);
+    public LeaderElectionService(CoordinationService cs, String brokerId,
+                                 String serviceUrl, Consumer<LeaderElectionState> listener) {
+        this(cs, brokerId, serviceUrl, ELECTION_ROOT, listener);
     }
 
     public LeaderElectionService(CoordinationService cs,
-                                 String localWebServiceAddress,
-                                 String electionRoot,
+                                 String brokerId,
+                                 String serviceUrl, String electionRoot,
                                  Consumer<LeaderElectionState> listener) {
         this.leaderElection = cs.getLeaderElection(LeaderBroker.class, electionRoot, listener);
-        this.localValue = new LeaderBroker(localWebServiceAddress);
+        this.localValue = new LeaderBroker(brokerId, serviceUrl);
     }
 
     public void start() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
index 80f887d394d..f9f36b705d4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
@@ -43,7 +43,7 @@ import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 public class NoopLoadManager implements LoadManager {
 
     private PulsarService pulsar;
-    private String lookupServiceAddress;
+    private String brokerId;
     private ResourceUnit localResourceUnit;
     private LockManager<LocalBrokerData> lockManager;
     private Map<String, String> bundleBrokerAffinityMap;
@@ -57,16 +57,15 @@ public class NoopLoadManager implements LoadManager {
 
     @Override
     public void start() throws PulsarServerException {
-        lookupServiceAddress = pulsar.getLookupServiceAddress();
-        localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress),
-                new PulsarResourceDescription());
+        brokerId = pulsar.getBrokerId();
+        localResourceUnit = new SimpleResourceUnit(brokerId, new PulsarResourceDescription());
 
         LocalBrokerData localData = new LocalBrokerData(pulsar.getWebServiceAddress(),
                 pulsar.getWebServiceAddressTls(),
                 pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
         localData.setProtocols(pulsar.getProtocolDataToAdvertise());
         localData.setLoadManagerClassName(this.pulsar.getConfig().getLoadManagerClassName());
-        String brokerReportPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
+        String brokerReportPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + brokerId;
 
         try {
             log.info("Acquiring broker resource lock on {}", brokerReportPath);
@@ -129,12 +128,12 @@ public class NoopLoadManager implements LoadManager {
 
     @Override
     public Set<String> getAvailableBrokers() throws Exception {
-        return Collections.singleton(lookupServiceAddress);
+        return Collections.singleton(brokerId);
     }
 
     @Override
     public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
-        return CompletableFuture.completedFuture(Collections.singleton(lookupServiceAddress));
+        return CompletableFuture.completedFuture(Collections.singleton(brokerId));
     }
 
     @Override
@@ -153,7 +152,6 @@ public class NoopLoadManager implements LoadManager {
         if (StringUtils.isBlank(broker)) {
             return this.bundleBrokerAffinityMap.remove(bundle);
         }
-        broker = broker.replaceFirst("http[s]?://", "");
         return this.bundleBrokerAffinityMap.put(bundle, broker);
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java
index ef4dd2a97b2..c28a8be4c0d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java
@@ -23,8 +23,6 @@ package org.apache.pulsar.broker.loadbalance;
  */
 public interface ResourceUnit extends Comparable<ResourceUnit> {
 
-    String PROPERTY_KEY_BROKER_ZNODE_NAME = "__advertised_addr";
-
     String getResourceId();
 
     ResourceDescription getAvailableResource();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index bfdaa078f19..18e30ddf922 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -82,7 +82,7 @@ public class BrokerRegistryImpl implements BrokerRegistry {
         this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
         this.scheduler = pulsar.getLoadManagerExecutor();
         this.listeners = new ArrayList<>();
-        this.brokerId = pulsar.getLookupServiceAddress();
+        this.brokerId = pulsar.getBrokerId();
         this.brokerLookupData = new BrokerLookupData(
                 pulsar.getWebServiceAddress(),
                 pulsar.getWebServiceAddressTls(),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 581183cf95a..dee660e1c3a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -348,7 +348,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
         try {
             this.brokerRegistry = new BrokerRegistryImpl(pulsar);
             this.leaderElectionService = new LeaderElectionService(
-                    pulsar.getCoordinationService(), pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
+                    pulsar.getCoordinationService(), pulsar.getBrokerId(),
+                    pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
                     state -> {
                         pulsar.getLoadManagerExecutor().execute(() -> {
                             if (state == LeaderElectionState.Leading) {
@@ -790,7 +791,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
     @VisibleForTesting
     void playLeader() {
         log.info("This broker:{} is setting the role from {} to {}",
-                pulsar.getLookupServiceAddress(), role, Leader);
+                pulsar.getBrokerId(), role, Leader);
         int retry = 0;
         while (!Thread.currentThread().isInterrupted()) {
             try {
@@ -807,7 +808,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
                 break;
             } catch (Throwable e) {
                 log.error("The broker:{} failed to set the role. Retrying {} th ...",
-                        pulsar.getLookupServiceAddress(), ++retry, e);
+                        pulsar.getBrokerId(), ++retry, e);
                 try {
                     Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
                 } catch (InterruptedException ex) {
@@ -818,7 +819,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
             }
         }
         role = Leader;
-        log.info("This broker:{} plays the leader now.", pulsar.getLookupServiceAddress());
+        log.info("This broker:{} plays the leader now.", pulsar.getBrokerId());
 
         // flush the load data when the leader is elected.
         brokerLoadDataReporter.reportAsync(true);
@@ -828,7 +829,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
     @VisibleForTesting
     void playFollower() {
         log.info("This broker:{} is setting the role from {} to {}",
-                pulsar.getLookupServiceAddress(), role, Follower);
+                pulsar.getBrokerId(), role, Follower);
         int retry = 0;
         while (!Thread.currentThread().isInterrupted()) {
             try {
@@ -841,7 +842,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
                 break;
             } catch (Throwable e) {
                 log.error("The broker:{} failed to set the role. Retrying {} th ...",
-                        pulsar.getLookupServiceAddress(), ++retry, e);
+                        pulsar.getBrokerId(), ++retry, e);
                 try {
                     Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
                 } catch (InterruptedException ex) {
@@ -852,7 +853,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
             }
         }
         role = Follower;
-        log.info("This broker:{} plays a follower now.", pulsar.getLookupServiceAddress());
+        log.info("This broker:{} plays a follower now.", pulsar.getBrokerId());
 
         // flush the load data when the leader is elected.
         brokerLoadDataReporter.reportAsync(true);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index bd571284346..8704509c17c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -73,6 +73,7 @@ import org.apache.pulsar.PulsarClusterMetadataSetup;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LeaderBroker;
 import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
 import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
 import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
@@ -124,7 +125,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
     private final ServiceConfiguration config;
     private final Schema<ServiceUnitStateData> schema;
     private final Map<String, CompletableFuture<String>> getOwnerRequests;
-    private final String lookupServiceAddress;
+    private final String brokerId;
     private final Map<String, CompletableFuture<Void>> cleanupJobs;
     private final StateChangeListeners stateChangeListeners;
     private ExtensibleLoadManagerImpl loadManager;
@@ -201,7 +202,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
     public ServiceUnitStateChannelImpl(PulsarService pulsar) {
         this.pulsar = pulsar;
         this.config = pulsar.getConfig();
-        this.lookupServiceAddress = pulsar.getLookupServiceAddress();
+        this.brokerId = pulsar.getBrokerId();
         this.schema = Schema.JSON(ServiceUnitStateData.class);
         this.getOwnerRequests = new ConcurrentHashMap<>();
         this.cleanupJobs = new ConcurrentHashMap<>();
@@ -249,7 +250,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
                             },
                             0, ownershipMonitorDelayTimeInSecs, SECONDS);
             log.info("This leader broker:{} started the ownership monitor.",
-                    lookupServiceAddress);
+                    brokerId);
         }
     }
 
@@ -258,13 +259,13 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
             monitorTask.cancel(false);
             monitorTask = null;
             log.info("This previous leader broker:{} stopped the ownership monitor.",
-                    lookupServiceAddress);
+                    brokerId);
         }
     }
 
     @Override
     public void cleanOwnerships() {
-        doCleanup(lookupServiceAddress);
+        doCleanup(brokerId);
     }
 
     public synchronized void start() throws PulsarServerException {
@@ -430,19 +431,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
                     new IllegalStateException("Invalid channel state:" + channelState.name()));
         }
 
-        return leaderElectionService.readCurrentLeader().thenApply(leader -> {
-                    //expecting http://broker-xyz:port
-                    // TODO: discard this protocol prefix removal
-                    //  by a util func that returns lookupServiceAddress(serviceUrl)
-                    if (leader.isPresent()) {
-                        String broker = leader.get().getServiceUrl();
-                        broker = broker.substring(broker.lastIndexOf('/') + 1);
-                        return Optional.of(broker);
-                    } else {
-                        return Optional.empty();
-                    }
-                }
-        );
+        return leaderElectionService.readCurrentLeader()
+                .thenApply(leader -> leader.map(LeaderBroker::getBrokerId));
     }
 
     public CompletableFuture<Boolean> isChannelOwnerAsync() {
@@ -484,7 +474,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
     }
 
     public boolean isOwner(String serviceUnit) {
-        return isOwner(serviceUnit, lookupServiceAddress);
+        return isOwner(serviceUnit, brokerId);
     }
 
     private CompletableFuture<Optional<String>> getActiveOwnerAsync(
@@ -672,7 +662,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
         long totalHandledRequests = getHandlerTotalCounter(data).incrementAndGet();
         if (debug()) {
             log.info("{} received a handle request for serviceUnit:{}, data:{}. totalHandledRequests:{}",
-                    lookupServiceAddress, serviceUnit, data, totalHandledRequests);
+                    brokerId, serviceUnit, data, totalHandledRequests);
         }
 
         ServiceUnitState state = state(data);
@@ -736,7 +726,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
                 long handlerFailureCount = getHandlerFailureCounter(data).get();
                 log.info("{} handled {} event for serviceUnit:{}, cur:{}, next:{}, "
                                 + "totalHandledRequests:{}, totalFailedRequests:{}",
-                        lookupServiceAddress, getLogEventTag(data), serviceUnit,
+                        brokerId, getLogEventTag(data), serviceUnit,
                         data == null ? "" : data,
                         next == null ? "" : next,
                         handlerTotalCount, handlerFailureCount
@@ -747,7 +737,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
             long handlerFailureCount = getHandlerFailureCounter(data).incrementAndGet();
             log.error("{} failed to handle {} event for serviceUnit:{}, cur:{}, next:{}, "
                             + "totalHandledRequests:{}, totalFailedRequests:{}",
-                    lookupServiceAddress, getLogEventTag(data), serviceUnit,
+                    brokerId, getLogEventTag(data), serviceUnit,
                     data == null ? "" : data,
                     next == null ? "" : next,
                     handlerTotalCount, handlerFailureCount,
@@ -885,7 +875,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
         if (broker == null) {
             return false;
         }
-        return broker.equals(lookupServiceAddress);
+        return broker.equals(brokerId);
     }
 
     private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
@@ -1291,7 +1281,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
                     MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS);
                 } catch (InterruptedException e) {
                     log.warn("Interrupted while delaying the next service unit clean-up. Cleaning broker:{}",
-                            lookupServiceAddress);
+                            brokerId);
                 }
             }
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java
index 468552db541..56238d6528e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java
@@ -42,14 +42,14 @@ public class IsolationPoliciesHelper {
         return LoadManagerShared.applyNamespacePoliciesAsync(serviceUnit, policies,
                 availableBrokers.keySet(), new LoadManagerShared.BrokerTopicLoadingPredicate() {
                     @Override
-                    public boolean isEnablePersistentTopics(String brokerUrl) {
-                        BrokerLookupData lookupData = availableBrokers.get(brokerUrl.replace("http://", ""));
+                    public boolean isEnablePersistentTopics(String brokerId) {
+                        BrokerLookupData lookupData = availableBrokers.get(brokerId);
                         return lookupData != null && lookupData.persistentTopicsEnabled();
                     }
 
                     @Override
-                    public boolean isEnableNonPersistentTopics(String brokerUrl) {
-                        BrokerLookupData lookupData = availableBrokers.get(brokerUrl.replace("http://", ""));
+                    public boolean isEnableNonPersistentTopics(String brokerId) {
+                        BrokerLookupData lookupData = availableBrokers.get(brokerId);
                         return lookupData != null && lookupData.nonPersistentTopicsEnabled();
                     }
                 });
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java
index b07acfda7f7..3061969120b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java
@@ -55,7 +55,7 @@ public class BrokerLoadDataReporter implements LoadDataReporter<BrokerLoadData>,
 
     private final BrokerHostUsage brokerHostUsage;
 
-    private final String lookupServiceAddress;
+    private final String brokerId;
 
     @Getter
     private final BrokerLoadData localData;
@@ -67,10 +67,10 @@ public class BrokerLoadDataReporter implements LoadDataReporter<BrokerLoadData>,
     private long tombstoneDelayInMillis;
 
     public BrokerLoadDataReporter(PulsarService pulsar,
-                                  String lookupServiceAddress,
+                                  String brokerId,
                                   LoadDataStore<BrokerLoadData> brokerLoadDataStore) {
         this.brokerLoadDataStore = brokerLoadDataStore;
-        this.lookupServiceAddress = lookupServiceAddress;
+        this.brokerId = brokerId;
         this.pulsar = pulsar;
         this.conf = this.pulsar.getConfiguration();
         if (SystemUtils.IS_OS_LINUX) {
@@ -111,7 +111,7 @@ public class BrokerLoadDataReporter implements LoadDataReporter<BrokerLoadData>,
                 log.info("publishing load report:{}", localData.toString(conf));
             }
             CompletableFuture<Void> future =
-                    this.brokerLoadDataStore.pushAsync(this.lookupServiceAddress, newLoadData);
+                    this.brokerLoadDataStore.pushAsync(this.brokerId, newLoadData);
             future.whenComplete((__, ex) -> {
                 if (ex == null) {
                     localData.setReportedAt(System.currentTimeMillis());
@@ -185,7 +185,7 @@ public class BrokerLoadDataReporter implements LoadDataReporter<BrokerLoadData>,
         }
         var lastSuccessfulTombstonedAt = lastTombstonedAt;
         lastTombstonedAt = now; // dedup first
-        brokerLoadDataStore.removeAsync(lookupServiceAddress)
+        brokerLoadDataStore.removeAsync(brokerId)
                 .whenComplete((__, e) -> {
                             if (e != null) {
                                 log.error("Failed to clean broker load data.", e);
@@ -209,13 +209,13 @@ public class BrokerLoadDataReporter implements LoadDataReporter<BrokerLoadData>,
         ServiceUnitState state = ServiceUnitStateData.state(data);
         switch (state) {
             case Releasing, Splitting -> {
-                if (StringUtils.equals(data.sourceBroker(), lookupServiceAddress)) {
+                if (StringUtils.equals(data.sourceBroker(), brokerId)) {
                     localData.clear();
                     tombstone();
                 }
             }
             case Owned -> {
-                if (StringUtils.equals(data.dstBroker(), lookupServiceAddress)) {
+                if (StringUtils.equals(data.dstBroker(), brokerId)) {
                     localData.clear();
                     tombstone();
                 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java
index 0fa37d3687c..43e05ad1ac9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java
@@ -41,7 +41,7 @@ public class TopBundleLoadDataReporter implements LoadDataReporter<TopBundlesLoa
 
     private final PulsarService pulsar;
 
-    private final String lookupServiceAddress;
+    private final String brokerId;
 
     private final LoadDataStore<TopBundlesLoadData> bundleLoadDataStore;
 
@@ -53,10 +53,10 @@ public class TopBundleLoadDataReporter implements LoadDataReporter<TopBundlesLoa
     private long tombstoneDelayInMillis;
 
     public TopBundleLoadDataReporter(PulsarService pulsar,
-                                     String lookupServiceAddress,
+                                     String brokerId,
                                      LoadDataStore<TopBundlesLoadData> bundleLoadDataStore) {
         this.pulsar = pulsar;
-        this.lookupServiceAddress = lookupServiceAddress;
+        this.brokerId = brokerId;
         this.bundleLoadDataStore = bundleLoadDataStore;
         this.lastBundleStatsUpdatedAt = 0;
         this.topKBundles = new TopKBundles(pulsar);
@@ -88,7 +88,7 @@ public class TopBundleLoadDataReporter implements LoadDataReporter<TopBundlesLoa
             if (ExtensibleLoadManagerImpl.debug(pulsar.getConfiguration(), log)) {
                 log.info("Reporting TopBundlesLoadData:{}", topKBundles.getLoadData());
             }
-            return this.bundleLoadDataStore.pushAsync(lookupServiceAddress, topKBundles.getLoadData())
+            return this.bundleLoadDataStore.pushAsync(brokerId, topKBundles.getLoadData())
                     .exceptionally(e -> {
                         log.error("Failed to report top-bundles load data.", e);
                         return null;
@@ -106,7 +106,7 @@ public class TopBundleLoadDataReporter implements LoadDataReporter<TopBundlesLoa
         }
         var lastSuccessfulTombstonedAt = lastTombstonedAt;
         lastTombstonedAt = now; // dedup first
-        bundleLoadDataStore.removeAsync(lookupServiceAddress)
+        bundleLoadDataStore.removeAsync(brokerId)
                 .whenComplete((__, e) -> {
                             if (e != null) {
                                 log.error("Failed to clean broker load data.", e);
@@ -129,12 +129,12 @@ public class TopBundleLoadDataReporter implements LoadDataReporter<TopBundlesLoa
         ServiceUnitState state = ServiceUnitStateData.state(data);
         switch (state) {
             case Releasing, Splitting -> {
-                if (StringUtils.equals(data.sourceBroker(), lookupServiceAddress)) {
+                if (StringUtils.equals(data.sourceBroker(), brokerId)) {
                     tombstone();
                 }
             }
             case Owned -> {
-                if (StringUtils.equals(data.dstBroker(), lookupServiceAddress)) {
+                if (StringUtils.equals(data.dstBroker(), brokerId)) {
                     tombstone();
                 }
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 5f2e4b1f25d..3d627db6cfa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -21,8 +21,6 @@ package org.apache.pulsar.broker.loadbalance.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
 import io.netty.util.concurrent.FastThreadLocal;
-import java.net.MalformedURLException;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -106,59 +104,56 @@ public class LoadManagerShared {
         if (isIsolationPoliciesPresent) {
             LOG.debug("Isolation Policies Present for namespace - [{}]", namespace.toString());
         }
-        for (final String broker : availableBrokers) {
-            final String brokerUrlString = String.format("http://%s", broker);
-            URL brokerUrl;
+        for (final String brokerId : availableBrokers) {
+            String brokerHost;
             try {
-                brokerUrl = new URL(brokerUrlString);
-            } catch (MalformedURLException e) {
-                LOG.error("Unable to parse brokerUrl from ResourceUnitId", e);
+                brokerHost = parseBrokerHost(brokerId);
+            } catch (IllegalArgumentException e) {
+                LOG.error("Unable to parse host from {}", brokerId, e);
                 continue;
             }
             // todo: in future check if the resource unit has resources to take the namespace
             if (isIsolationPoliciesPresent) {
                 // note: serviceUnitID is namespace name and ResourceID is brokerName
-                if (policies.isPrimaryBroker(namespace, brokerUrl.getHost())) {
-                    primariesCache.add(broker);
+                if (policies.isPrimaryBroker(namespace, brokerHost)) {
+                    primariesCache.add(brokerId);
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Added Primary Broker - [{}] as possible Candidates for"
-                                + " namespace - [{}] with policies", brokerUrl.getHost(), namespace.toString());
+                                + " namespace - [{}] with policies", brokerHost, namespace.toString());
                     }
-                } else if (policies.isSecondaryBroker(namespace, brokerUrl.getHost())) {
-                    secondaryCache.add(broker);
+                } else if (policies.isSecondaryBroker(namespace, brokerHost)) {
+                    secondaryCache.add(brokerId);
                     if (LOG.isDebugEnabled()) {
                         LOG.debug(
                                 "Added Shared Broker - [{}] as possible "
                                         + "Candidates for namespace - [{}] with policies",
-                                brokerUrl.getHost(), namespace.toString());
+                                brokerHost, namespace.toString());
                     }
                 } else {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Skipping Broker - [{}] not primary broker and not shared" + " for namespace - [{}] ",
-                                brokerUrl.getHost(), namespace.toString());
+                                brokerHost, namespace.toString());
                     }
 
                 }
             } else {
                 // non-persistent topic can be assigned to only those brokers that enabled for non-persistent topic
-                if (isNonPersistentTopic
-                        && !brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
+                if (isNonPersistentTopic && !brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerId)) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Filter broker- [{}] because it doesn't support non-persistent namespace - [{}]",
-                                brokerUrl.getHost(), namespace.toString());
+                                brokerHost, namespace.toString());
                     }
-                } else if (!isNonPersistentTopic
-                        && !brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
+                } else if (!isNonPersistentTopic && !brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerId)) {
                     // persistent topic can be assigned to only brokers that enabled for persistent-topic
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Filter broker- [{}] because broker only supports non-persistent namespace - [{}]",
-                                brokerUrl.getHost(), namespace.toString());
+                                brokerHost, namespace.toString());
                     }
-                } else if (policies.isSharedBroker(brokerUrl.getHost())) {
-                    secondaryCache.add(broker);
+                } else if (policies.isSharedBroker(brokerHost)) {
+                    secondaryCache.add(brokerId);
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Added Shared Broker - [{}] as possible Candidates for namespace - [{}]",
-                                brokerUrl.getHost(), namespace.toString());
+                                brokerHost, namespace.toString());
                     }
                 }
             }
@@ -181,6 +176,16 @@ public class LoadManagerShared {
         }
     }
 
+    private static String parseBrokerHost(String brokerId) {
+        // use last index to support ipv6 addresses
+        int lastIdx = brokerId.lastIndexOf(':');
+        if (lastIdx > -1) {
+            return brokerId.substring(0, lastIdx);
+        } else {
+            throw new IllegalArgumentException("Invalid brokerId: " + brokerId);
+        }
+    }
+
     public static CompletableFuture<Set<String>> applyNamespacePoliciesAsync(
             final ServiceUnitId serviceUnit, final SimpleResourceAllocationPolicies policies,
             final Set<String> availableBrokers, final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
@@ -199,59 +204,57 @@ public class LoadManagerShared {
                     LOG.debug("Isolation Policies Present for namespace - [{}]", namespace.toString());
                 }
             }
-            for (final String broker : availableBrokers) {
-                final String brokerUrlString = String.format("http://%s", broker);
-                URL brokerUrl;
+            for (final String brokerId : availableBrokers) {
+                String brokerHost;
                 try {
-                    brokerUrl = new URL(brokerUrlString);
-                } catch (MalformedURLException e) {
-                    LOG.error("Unable to parse brokerUrl from ResourceUnitId", e);
+                    brokerHost = parseBrokerHost(brokerId);
+                } catch (IllegalArgumentException e) {
+                    LOG.error("Unable to parse host from {}", brokerId, e);
                     continue;
                 }
                 // todo: in future check if the resource unit has resources to take the namespace
                 if (isIsolationPoliciesPresent) {
                     // note: serviceUnitID is namespace name and ResourceID is brokerName
-                    if (policies.isPrimaryBroker(namespace, brokerUrl.getHost())) {
-                        primariesCache.add(broker);
+                    if (policies.isPrimaryBroker(namespace, brokerHost)) {
+                        primariesCache.add(brokerId);
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Added Primary Broker - [{}] as possible Candidates for"
-                                    + " namespace - [{}] with policies", brokerUrl.getHost(), namespace.toString());
+                                    + " namespace - [{}] with policies", brokerHost, namespace.toString());
                         }
-                    } else if (policies.isSecondaryBroker(namespace, brokerUrl.getHost())) {
-                        secondaryCache.add(broker);
+                    } else if (policies.isSecondaryBroker(namespace, brokerHost)) {
+                        secondaryCache.add(brokerId);
                         if (LOG.isDebugEnabled()) {
                             LOG.debug(
                                     "Added Shared Broker - [{}] as possible "
                                             + "Candidates for namespace - [{}] with policies",
-                                    brokerUrl.getHost(), namespace.toString());
+                                    brokerHost, namespace.toString());
                         }
                     } else {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Skipping Broker - [{}] not primary broker and not shared"
-                                            + " for namespace - [{}] ", brokerUrl.getHost(), namespace.toString());
+                                            + " for namespace - [{}] ", brokerHost, namespace.toString());
                         }
 
                     }
                 } else {
                     // non-persistent topic can be assigned to only those brokers that enabled for non-persistent topic
-                    if (isNonPersistentTopic
-                            && !brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
+                    if (isNonPersistentTopic && !brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerId)) {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Filter broker- [{}] because it doesn't support non-persistent namespace - [{}]",
-                                    brokerUrl.getHost(), namespace.toString());
+                                    brokerId, namespace.toString());
                         }
-                    } else if (!isNonPersistentTopic
-                            && !brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
+                    } else if (!isNonPersistentTopic && !brokerTopicLoadingPredicate
+                            .isEnablePersistentTopics(brokerId)) {
                         // persistent topic can be assigned to only brokers that enabled for persistent-topic
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Filter broker- [{}] because broker only supports non-persistent "
-                                            + "namespace - [{}]", brokerUrl.getHost(), namespace.toString());
+                                            + "namespace - [{}]", brokerId, namespace.toString());
                         }
-                    } else if (policies.isSharedBroker(brokerUrl.getHost())) {
-                        secondaryCache.add(broker);
+                    } else if (policies.isSharedBroker(brokerHost)) {
+                        secondaryCache.add(brokerId);
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Added Shared Broker - [{}] as possible Candidates for namespace - [{}]",
-                                    brokerUrl.getHost(), namespace.toString());
+                                    brokerHost, namespace.toString());
                         }
                     }
                 }
@@ -762,9 +765,9 @@ public class LoadManagerShared {
     }
 
     public interface BrokerTopicLoadingPredicate {
-        boolean isEnablePersistentTopics(String brokerUrl);
+        boolean isEnablePersistentTopics(String brokerId);
 
-        boolean isEnableNonPersistentTopics(String brokerUrl);
+        boolean isEnableNonPersistentTopics(String brokerId);
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 4ecdfefbdd0..974d75d60b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -208,15 +208,15 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
         this.bundleBrokerAffinityMap = new ConcurrentHashMap<>();
         this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
             @Override
-            public boolean isEnablePersistentTopics(String brokerUrl) {
-                final BrokerData brokerData = loadData.getBrokerData().get(brokerUrl.replace("http://", ""));
+            public boolean isEnablePersistentTopics(String brokerId) {
+                final BrokerData brokerData = loadData.getBrokerData().get(brokerId);
                 return brokerData != null && brokerData.getLocalData() != null
                         && brokerData.getLocalData().isPersistentTopicsEnabled();
             }
 
             @Override
-            public boolean isEnableNonPersistentTopics(String brokerUrl) {
-                final BrokerData brokerData = loadData.getBrokerData().get(brokerUrl.replace("http://", ""));
+            public boolean isEnableNonPersistentTopics(String brokerId) {
+                final BrokerData brokerData = loadData.getBrokerData().get(brokerId);
                 return brokerData != null && brokerData.getLocalData() != null
                         && brokerData.getLocalData().isNonPersistentTopicsEnabled();
             }
@@ -977,14 +977,14 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
             localData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
             localData.setLoadManagerClassName(conf.getLoadManagerClassName());
 
-            String lookupServiceAddress = pulsar.getLookupServiceAddress();
-            brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
+            String brokerId = pulsar.getBrokerId();
+            brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + brokerId;
             updateLocalBrokerData();
 
             brokerDataLock = brokersData.acquireLock(brokerZnodePath, localData).join();
             pulsarResources.getLoadBalanceResources()
                     .getBrokerTimeAverageDataResources()
-                    .updateTimeAverageBrokerData(lookupServiceAddress, new TimeAverageBrokerData())
+                    .updateTimeAverageBrokerData(brokerId, new TimeAverageBrokerData())
                     .join();
             updateAll();
         } catch (Exception e) {
@@ -1212,7 +1212,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
         if (StringUtils.isBlank(broker)) {
             return this.bundleBrokerAffinityMap.remove(bundle);
         }
-        broker = broker.replaceFirst("http[s]?://", "");
         return this.bundleBrokerAffinityMap.put(bundle, broker);
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index 63bc7ab07fe..c8d81bda1bc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.loadbalance.impl;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -32,7 +31,6 @@ import org.apache.pulsar.broker.loadbalance.ResourceUnit;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
-import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 
 /**
  * Wrapper class allowing classes of instance ModularLoadManager to be compatible with the interface LoadManager.
@@ -75,20 +73,6 @@ public class ModularLoadManagerWrapper implements LoadManager {
         return leastLoadedBroker.map(this::buildBrokerResourceUnit);
     }
 
-    private String getBrokerWebServiceUrl(String broker) {
-        LocalBrokerData localData = (loadManager).getBrokerLocalData(broker);
-        if (localData != null) {
-            return localData.getWebServiceUrlTls() != null ? localData.getWebServiceUrlTls()
-                    : localData.getWebServiceUrl();
-        }
-        return String.format("http://%s", broker);
-    }
-
-    private String getBrokerZnodeName(String broker, String webServiceUrl) {
-        String scheme = webServiceUrl.substring(0, webServiceUrl.indexOf("://"));
-        return String.format("%s://%s", scheme, broker);
-    }
-
     @Override
     public List<Metrics> getLoadBalancingMetrics() {
         return loadManager.getLoadBalancingMetrics();
@@ -149,10 +133,7 @@ public class ModularLoadManagerWrapper implements LoadManager {
     }
 
     private SimpleResourceUnit buildBrokerResourceUnit (String broker) {
-        String webServiceUrl = getBrokerWebServiceUrl(broker);
-        String brokerZnodeName = getBrokerZnodeName(broker, webServiceUrl);
-        return new SimpleResourceUnit(webServiceUrl,
-                new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName));
+        return new SimpleResourceUnit(broker, new PulsarResourceDescription());
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index ee60595a485..be0580808ca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -211,15 +211,15 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
                         .build();
         this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
             @Override
-            public boolean isEnablePersistentTopics(String brokerUrl) {
-                ResourceUnit ru = new SimpleResourceUnit(brokerUrl, new PulsarResourceDescription());
+            public boolean isEnablePersistentTopics(String brokerId) {
+                ResourceUnit ru = new SimpleResourceUnit(brokerId, new PulsarResourceDescription());
                 LoadReport loadReport = currentLoadReports.get(ru);
                 return loadReport != null && loadReport.isPersistentTopicsEnabled();
             }
 
             @Override
-            public boolean isEnableNonPersistentTopics(String brokerUrl) {
-                ResourceUnit ru = new SimpleResourceUnit(brokerUrl, new PulsarResourceDescription());
+            public boolean isEnableNonPersistentTopics(String brokerId) {
+                ResourceUnit ru = new SimpleResourceUnit(brokerId, new PulsarResourceDescription());
                 LoadReport loadReport = currentLoadReports.get(ru);
                 return loadReport != null && loadReport.isNonPersistentTopicsEnabled();
             }
@@ -266,8 +266,8 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
     @Override
     public void start() throws PulsarServerException {
         // Register the brokers in metadata store
-        String lookupServiceAddress = pulsar.getLookupServiceAddress();
-        String brokerLockPath = LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
+        String brokerId = pulsar.getBrokerId();
+        String brokerLockPath = LOADBALANCE_BROKERS_ROOT + "/" + brokerId;
 
         try {
             LoadReport loadReport = null;
@@ -653,7 +653,6 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
      */
     private synchronized void doLoadRanking() {
         ResourceUnitRanking.setCpuUsageByMsgRate(this.realtimeCpuLoadFactor);
-        String hostname = pulsar.getAdvertisedAddress();
         String strategy = this.getLoadBalancerPlacementStrategy();
         log.info("doLoadRanking - load balancing strategy: {}", strategy);
         if (!currentLoadReports.isEmpty()) {
@@ -702,8 +701,8 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
                 }
 
                 // update metrics
-                if (resourceUnit.getResourceId().contains(hostname)) {
-                    updateLoadBalancingMetrics(hostname, finalRank, ranking);
+                if (resourceUnit.getResourceId().equals(pulsar.getBrokerId())) {
+                    updateLoadBalancingMetrics(pulsar.getAdvertisedAddress(), finalRank, ranking);
                 }
             }
             updateBrokerToNamespaceToBundle();
@@ -711,7 +710,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
             this.resourceUnitRankings = newResourceUnitRankings;
         } else {
             log.info("Leader broker[{}] No ResourceUnits to rank this run, Using Old Ranking",
-                    pulsar.getSafeWebServiceAddress());
+                    pulsar.getBrokerId());
         }
     }
 
@@ -855,7 +854,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
                 // Add preallocated bundle range so incoming bundles from the same namespace are not assigned to the
                 // same broker.
                 brokerToNamespaceToBundleRange
-                        .computeIfAbsent(selectedRU.getResourceId().replace("http://", ""),
+                        .computeIfAbsent(selectedRU.getResourceId(),
                                 k -> ConcurrentOpenHashMap.<String,
                                         ConcurrentOpenHashSet<String>>newBuilder()
                                         .build())
@@ -876,7 +875,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
             availableBrokersCache.clear();
             for (final Set<ResourceUnit> resourceUnits : availableBrokers.values()) {
                 for (final ResourceUnit resourceUnit : resourceUnits) {
-                    availableBrokersCache.add(resourceUnit.getResourceId().replace("http://", ""));
+                    availableBrokersCache.add(resourceUnit.getResourceId());
                 }
             }
             brokerCandidateCache.clear();
@@ -899,7 +898,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
                 final Long rank = entry.getKey();
                 final Set<ResourceUnit> resourceUnits = entry.getValue();
                 for (final ResourceUnit resourceUnit : resourceUnits) {
-                    if (brokerCandidateCache.contains(resourceUnit.getResourceId().replace("http://", ""))) {
+                    if (brokerCandidateCache.contains(resourceUnit.getResourceId())) {
                         result.put(rank, resourceUnit);
                     }
                 }
@@ -928,8 +927,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
 
             availableBrokers = new HashMap<>();
             for (String broker : activeBrokers) {
-                ResourceUnit resourceUnit = new SimpleResourceUnit(String.format("http://%s", broker),
-                        new PulsarResourceDescription());
+                ResourceUnit resourceUnit = new SimpleResourceUnit(broker, new PulsarResourceDescription());
                 availableBrokers.computeIfAbsent(0L, key -> new TreeSet<>()).add(resourceUnit);
             }
             log.info("Choosing at random from broker list: [{}]", availableBrokers.values());
@@ -956,7 +954,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
             Iterator<Map.Entry<Long, ResourceUnit>> candidateIterator = finalCandidates.entries().iterator();
             while (candidateIterator.hasNext()) {
                 Map.Entry<Long, ResourceUnit> candidate = candidateIterator.next();
-                String candidateBrokerName = candidate.getValue().getResourceId().replace("http://", "");
+                String candidateBrokerName = candidate.getValue().getResourceId();
                 if (!activeBrokers.contains(candidateBrokerName)) {
                     candidateIterator.remove(); // Current candidate points to an inactive broker, so remove it
                 }
@@ -1005,8 +1003,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
                     try {
                         String key = String.format("%s/%s", LOADBALANCE_BROKERS_ROOT, broker);
                         LoadReport lr = loadReports.readLock(key).join().get();
-                        ResourceUnit ru = new SimpleResourceUnit(String.format("http://%s", lr.getName()),
-                                fromLoadReport(lr));
+                        ResourceUnit ru = new SimpleResourceUnit(lr.getName(), fromLoadReport(lr));
                         this.currentLoadReports.put(ru, lr);
                     } catch (Exception e) {
                         log.warn("Error reading load report from Cache for broker - [{}], [{}]", broker, e);
@@ -1078,7 +1075,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
                 loadReport.setProtocols(pulsar.getProtocolDataToAdvertise());
                 loadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
                 loadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
-                loadReport.setName(pulsar.getLookupServiceAddress());
+                loadReport.setName(pulsar.getBrokerId());
                 loadReport.setBrokerVersionString(pulsar.getBrokerVersion());
 
                 SystemResourceUsage systemResourceUsage = this.getSystemResourceUsage();
@@ -1121,8 +1118,8 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
                 loadReport.setAllocatedMsgRateIn(allocatedQuota.getMsgRateIn());
                 loadReport.setAllocatedMsgRateOut(allocatedQuota.getMsgRateOut());
 
-                final ResourceUnit resourceUnit = new SimpleResourceUnit(
-                        String.format("http://%s", loadReport.getName()), fromLoadReport(loadReport));
+                final ResourceUnit resourceUnit =
+                        new SimpleResourceUnit(loadReport.getName(), fromLoadReport(loadReport));
                 Set<String> preAllocatedBundles;
                 if (resourceUnitRankings.containsKey(resourceUnit)) {
                     preAllocatedBundles = resourceUnitRankings.get(resourceUnit).getPreAllocatedBundles();
@@ -1277,7 +1274,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
             final Set<String> preallocatedBundles = resourceUnitRankings.get(resourceUnit).getPreAllocatedBundles();
             final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
                     brokerToNamespaceToBundleRange
-                            .computeIfAbsent(broker.replace("http://", ""),
+                            .computeIfAbsent(broker,
                                     k -> ConcurrentOpenHashMap.<String,
                                             ConcurrentOpenHashSet<String>>newBuilder()
                                             .build());
@@ -1455,7 +1452,6 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
         if (StringUtils.isBlank(broker)) {
             return this.bundleBrokerAffinityMap.remove(bundle);
         }
-        broker = broker.replaceFirst("http[s]?://", "");
         return this.bundleBrokerAffinityMap.put(bundle, broker);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 4a54d4e0908..d8c3fd169a2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -192,7 +192,7 @@ public class NamespaceService implements AutoCloseable {
                     return findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {
                         if (optResult.isPresent()) {
                             LOG.info("[{}] Redirect lookup request to {} for topic {}",
-                                    pulsar.getSafeWebServiceAddress(), optResult.get(), topic);
+                                    pulsar.getBrokerId(), optResult.get(), topic);
                             return CompletableFuture.completedFuture(optResult);
                         }
                         if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
@@ -298,7 +298,7 @@ public class NamespaceService implements AutoCloseable {
         return findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {
             if (optResult.isPresent()) {
                 LOG.info("[{}] Redirect lookup request to {} for topic {}",
-                        pulsar.getSafeWebServiceAddress(), optResult.get(), topic);
+                        pulsar.getBrokerId(), optResult.get(), topic);
                 try {
                     LookupData lookupData = optResult.get().getLookupData();
                     final String redirectUrl = options.isRequestHttps()
@@ -338,17 +338,17 @@ public class NamespaceService implements AutoCloseable {
      * @throws PulsarServerException if an unexpected error occurs
      */
     public void registerBootstrapNamespaces() throws PulsarServerException {
-        String lookupServiceAddress = pulsar.getLookupServiceAddress();
+        String brokerId = pulsar.getBrokerId();
         // ensure that we own the heartbeat namespace
-        if (registerNamespace(getHeartbeatNamespace(lookupServiceAddress, config), true)) {
+        if (registerNamespace(getHeartbeatNamespace(brokerId, config), true)) {
             LOG.info("added heartbeat namespace name in local cache: ns={}",
-                    getHeartbeatNamespace(lookupServiceAddress, config));
+                    getHeartbeatNamespace(brokerId, config));
         }
 
         // ensure that we own the heartbeat namespace
-        if (registerNamespace(getHeartbeatNamespaceV2(lookupServiceAddress, config), true)) {
+        if (registerNamespace(getHeartbeatNamespaceV2(brokerId, config), true)) {
             LOG.info("added heartbeat namespace name in local cache: ns={}",
-                    getHeartbeatNamespaceV2(lookupServiceAddress, config));
+                    getHeartbeatNamespaceV2(brokerId, config));
         }
 
         // we may not need strict ownership checking for bootstrap names for now
@@ -506,7 +506,6 @@ public class NamespaceService implements AutoCloseable {
             return;
         }
         String candidateBroker;
-        String candidateBrokerAdvertisedAddr = null;
 
         LeaderElectionService les = pulsar.getLeaderElectionService();
         if (les == null) {
@@ -541,14 +540,14 @@ public class NamespaceService implements AutoCloseable {
 
                 if (options.isAuthoritative()) {
                     // leader broker already assigned the current broker as owner
-                    candidateBroker = pulsar.getSafeWebServiceAddress();
+                    candidateBroker = pulsar.getBrokerId();
                 } else {
                     LoadManager loadManager = this.loadManager.get();
                     boolean makeLoadManagerDecisionOnThisBroker = !loadManager.isCentralized() || les.isLeader();
                     if (!makeLoadManagerDecisionOnThisBroker) {
                         // If leader is not active, fallback to pick the least loaded from current broker loadmanager
                         boolean leaderBrokerActive = currentLeader.isPresent()
-                                && isBrokerActive(currentLeader.get().getServiceUrl());
+                                && isBrokerActive(currentLeader.get().getBrokerId());
                         if (!leaderBrokerActive) {
                             makeLoadManagerDecisionOnThisBroker = true;
                             if (currentLeader.isEmpty()) {
@@ -567,7 +566,7 @@ public class NamespaceService implements AutoCloseable {
                         }
                     }
                     if (makeLoadManagerDecisionOnThisBroker) {
-                        Optional<Pair<String, String>> availableBroker = getLeastLoadedFromLoadManager(bundle);
+                        Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
                         if (availableBroker.isEmpty()) {
                             LOG.warn("Load manager didn't return any available broker. "
                                             + "Returning empty result to lookup. NamespaceBundle[{}]",
@@ -575,12 +574,11 @@ public class NamespaceService implements AutoCloseable {
                             lookupFuture.complete(Optional.empty());
                             return;
                         }
-                        candidateBroker = availableBroker.get().getLeft();
-                        candidateBrokerAdvertisedAddr = availableBroker.get().getRight();
+                        candidateBroker = availableBroker.get();
                         authoritativeRedirect = true;
                     } else {
                         // forward to leader broker to make assignment
-                        candidateBroker = currentLeader.get().getServiceUrl();
+                        candidateBroker = currentLeader.get().getBrokerId();
                     }
                 }
             }
@@ -593,7 +591,7 @@ public class NamespaceService implements AutoCloseable {
         try {
             Objects.requireNonNull(candidateBroker);
 
-            if (candidateBroker.equals(pulsar.getSafeWebServiceAddress())) {
+            if (candidateBroker.equals(pulsar.getBrokerId())) {
                 // Load manager decided that the local broker should try to become the owner
                 ownershipCache.tryAcquiringOwnership(bundle).thenAccept(ownerInfo -> {
                     if (ownerInfo.isDisabled()) {
@@ -644,8 +642,7 @@ public class NamespaceService implements AutoCloseable {
                 }
 
                 // Now setting the redirect url
-                createLookupResult(candidateBrokerAdvertisedAddr == null ? candidateBroker
-                        : candidateBrokerAdvertisedAddr, authoritativeRedirect, options.getAdvertisedListenerName())
+                createLookupResult(candidateBroker, authoritativeRedirect, options.getAdvertisedListenerName())
                         .thenAccept(lookupResult -> lookupFuture.complete(Optional.of(lookupResult)))
                         .exceptionally(ex -> {
                             lookupFuture.completeExceptionally(ex);
@@ -665,7 +662,7 @@ public class NamespaceService implements AutoCloseable {
         CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
         try {
             checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null %s", candidateBroker);
-            String path = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + parseHostAndPort(candidateBroker);
+            String path = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + candidateBroker;
 
             localBrokerDataCache.get(path).thenAccept(reportData -> {
                 if (reportData.isPresent()) {
@@ -702,29 +699,19 @@ public class NamespaceService implements AutoCloseable {
     }
 
     public boolean isBrokerActive(String candidateBroker) {
-        String candidateBrokerHostAndPort = parseHostAndPort(candidateBroker);
         Set<String> availableBrokers = getAvailableBrokers();
-        if (availableBrokers.contains(candidateBrokerHostAndPort)) {
+        if (availableBrokers.contains(candidateBroker)) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Broker {} ({}) is available for.", candidateBroker, candidateBrokerHostAndPort);
+                LOG.debug("Broker {} is available for.", candidateBroker);
             }
             return true;
         } else {
-            LOG.warn("Broker {} ({}) couldn't be found in available brokers {}",
-                    candidateBroker, candidateBrokerHostAndPort,
-                    String.join(",", availableBrokers));
+            LOG.warn("Broker {} couldn't be found in available brokers {}",
+                    candidateBroker, String.join(",", availableBrokers));
             return false;
         }
     }
 
-    private static String parseHostAndPort(String candidateBroker) {
-        int uriSeparatorPos = candidateBroker.indexOf("://");
-        if (uriSeparatorPos == -1) {
-            throw new IllegalArgumentException("'" + candidateBroker + "' isn't an URI.");
-        }
-        return candidateBroker.substring(uriSeparatorPos + 3);
-    }
-
     private Set<String> getAvailableBrokers() {
         try {
             return loadManager.get().getAvailableBrokers();
@@ -740,7 +727,7 @@ public class NamespaceService implements AutoCloseable {
      * @return the least loaded broker addresses
      * @throws Exception if an error occurs
      */
-    private Optional<Pair<String, String>> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
+    private Optional<String> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
         Optional<ResourceUnit> leastLoadedBroker = loadManager.get().getLeastLoaded(serviceUnit);
         if (leastLoadedBroker.isEmpty()) {
             LOG.warn("No broker is available for {}", serviceUnit);
@@ -748,15 +735,13 @@ public class NamespaceService implements AutoCloseable {
         }
 
         String lookupAddress = leastLoadedBroker.get().getResourceId();
-        String advertisedAddr = (String) leastLoadedBroker.get()
-                .getProperty(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("{} : redirecting to the least loaded broker, lookup address={}",
-                    pulsar.getSafeWebServiceAddress(),
+                    pulsar.getBrokerId(),
                     lookupAddress);
         }
-        return Optional.of(Pair.of(lookupAddress, advertisedAddr));
+        return Optional.of(lookupAddress);
     }
 
     public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle) {
@@ -1564,7 +1549,7 @@ public class NamespaceService implements AutoCloseable {
     }
 
     public void unloadSLANamespace() throws Exception {
-        NamespaceName namespaceName = getSLAMonitorNamespace(pulsar.getLookupServiceAddress(), config);
+        NamespaceName namespaceName = getSLAMonitorNamespace(pulsar.getBrokerId(), config);
 
         LOG.info("Checking owner for SLA namespace {}", namespaceName);
 
@@ -1597,7 +1582,7 @@ public class NamespaceService implements AutoCloseable {
         Matcher m = HEARTBEAT_NAMESPACE_PATTERN.matcher(ns.getNamespaceObject().toString());
         if (m.matches()) {
             LOG.debug("Heartbeat namespace matched the lookup namespace {}", ns.getNamespaceObject().toString());
-            return String.format("http://%s", m.group(1));
+            return m.group(1);
         } else {
             return null;
         }
@@ -1607,7 +1592,7 @@ public class NamespaceService implements AutoCloseable {
         Matcher m = HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(ns.getNamespaceObject().toString());
         if (m.matches()) {
             LOG.debug("Heartbeat namespace v2 matched the lookup namespace {}", ns.getNamespaceObject().toString());
-            return String.format("http://%s", m.group(1));
+            return m.group(1);
         } else {
             return null;
         }
@@ -1616,7 +1601,7 @@ public class NamespaceService implements AutoCloseable {
     public static String getSLAMonitorBrokerName(ServiceUnitId ns) {
         Matcher m = SLA_NAMESPACE_PATTERN.matcher(ns.getNamespaceObject().toString());
         if (m.matches()) {
-            return String.format("http://%s", m.group(1));
+            return m.group(1);
         } else {
             return null;
         }
@@ -1647,16 +1632,16 @@ public class NamespaceService implements AutoCloseable {
     }
 
     public boolean registerSLANamespace() throws PulsarServerException {
-        String lookupServiceAddress = pulsar.getLookupServiceAddress();
-        boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(lookupServiceAddress, config), false);
+        String brokerId = pulsar.getBrokerId();
+        boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(brokerId, config), false);
         if (isNameSpaceRegistered) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Added SLA Monitoring namespace name in local cache: ns={}",
-                        getSLAMonitorNamespace(lookupServiceAddress, config));
+                        getSLAMonitorNamespace(brokerId, config));
             }
         } else if (LOG.isDebugEnabled()) {
             LOG.debug("SLA Monitoring not owned by the broker: ns={}",
-                    getSLAMonitorNamespace(lookupServiceAddress, config));
+                    getSLAMonitorNamespace(brokerId, config));
         }
         return isNameSpaceRegistered;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 8642815430a..62197900076 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2112,7 +2112,7 @@ public class BrokerService implements Closeable {
                     } else {
                         String msg = String.format("Namespace bundle for topic (%s) not served by this instance:%s. "
                                         + "Please redo the lookup. Request is denied: namespace=%s",
-                                topic, pulsar.getLookupServiceAddress(), topicName.getNamespace());
+                                topic, pulsar.getBrokerId(), topicName.getNamespace());
                         log.warn(msg);
                         return FutureUtil.failedFuture(new ServiceUnitNotReadyException(msg));
                     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 00cf3a6583b..2fa85f262de 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -968,7 +968,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
         });
 
         stats.topicEpoch = topicEpoch.orElse(null);
-        stats.ownerBroker = brokerService.pulsar().getLookupServiceAddress();
+        stats.ownerBroker = brokerService.pulsar().getBrokerId();
         future.complete(stats);
         return future;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6194d76b3b7..4e7a1392c83 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2409,7 +2409,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         stats.backlogSize = ledger.getEstimatedBacklogSize();
         stats.deduplicationStatus = messageDeduplication.getStatus().toString();
         stats.topicEpoch = topicEpoch.orElse(null);
-        stats.ownerBroker = brokerService.pulsar().getLookupServiceAddress();
+        stats.ownerBroker = brokerService.pulsar().getBrokerId();
         stats.offloadedStorageSize = ledger.getOffloadedSize();
         stats.lastOffloadLedgerId = ledger.getLastOffloadedLedgerId();
         stats.lastOffloadSuccessTimeStamp = ledger.getLastOffloadedSuccessTimestamp();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index e8192cde3fd..e23286ae449 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -56,7 +56,9 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationParameters;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.BookieResources;
@@ -93,6 +95,7 @@ import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.path.PolicyPath;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
@@ -1199,24 +1202,43 @@ public abstract class PulsarWebResource {
     /**
      * Redirect the call to the specified broker.
      *
-     * @param broker
-     *            Broker name
+     * @param brokerId broker's id (lookup service address)
      */
-    protected void validateBrokerName(String broker) {
-        String brokerUrl = String.format("http://%s", broker);
-        String brokerUrlTls = String.format("https://%s", broker);
-        if (!brokerUrl.equals(pulsar().getWebServiceAddress())
-                && !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
-            String[] parts = broker.split(":");
-            checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker));
-            String host = parts[0];
-            int port = Integer.parseInt(parts[1]);
-
-            URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(host).port(port).build();
-            log.debug("[{}] Redirecting the rest call to {}: broker={}", clientAppId(), redirect, broker);
-            throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
-
+    protected CompletableFuture<Void> maybeRedirectToBroker(String brokerId) {
+        // backwards compatibility
+        String cleanedBrokerId = brokerId.replaceFirst("http[s]?://", "");
+        if (pulsar.getBrokerId().equals(cleanedBrokerId)
+                // backwards compatibility
+                || ("http://" + cleanedBrokerId).equals(pulsar().getWebServiceAddress())
+                || ("https://" + cleanedBrokerId).equals(pulsar().getWebServiceAddressTls())) {
+            // no need to redirect, the current broker matches the given broker id
+            return CompletableFuture.completedFuture(null);
         }
+        LockManager<BrokerLookupData> brokerLookupDataLockManager =
+                pulsar().getCoordinationService().getLockManager(BrokerLookupData.class);
+        return brokerLookupDataLockManager.readLock(LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + cleanedBrokerId)
+                .thenAccept(brokerLookupDataOptional -> {
+                    if (brokerLookupDataOptional.isEmpty()) {
+                        throw new RestException(Status.NOT_FOUND,
+                                "Broker id '" + brokerId + "' not found in available brokers.");
+                    }
+                    brokerLookupDataOptional.ifPresent(brokerLookupData -> {
+                        URI targetBrokerUri;
+                        if ((isRequestHttps() || StringUtils.isBlank(brokerLookupData.getWebServiceUrl()))
+                                && StringUtils.isNotBlank(brokerLookupData.getWebServiceUrlTls())) {
+                            targetBrokerUri = URI.create(brokerLookupData.getWebServiceUrlTls());
+                        } else {
+                            targetBrokerUri = URI.create(brokerLookupData.getWebServiceUrl());
+                        }
+                        URI redirect = UriBuilder.fromUri(uri.getRequestUri())
+                                .scheme(targetBrokerUri.getScheme())
+                                .host(targetBrokerUri.getHost())
+                                .port(targetBrokerUri.getPort()).build();
+                        log.debug("[{}] Redirecting the rest call to {}: broker={}", clientAppId(), redirect,
+                                cleanedBrokerId);
+                        throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+                    });
+                });
     }
 
     public void validateTopicPolicyOperation(TopicName topicName, PolicyName policy, PolicyOperation operation) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index 47949d7312b..4a6524bf245 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -102,9 +102,9 @@ public class SLAMonitoringTest {
 
         createTenant(pulsarAdmins[BROKER_COUNT - 1]);
         for (int i = 0; i < BROKER_COUNT; i++) {
-            String topic = String.format("%s/%s/%s:%s", NamespaceService.SLA_NAMESPACE_PROPERTY, "my-cluster",
-                    pulsarServices[i].getAdvertisedAddress(), brokerWebServicePorts[i]);
-            pulsarAdmins[0].namespaces().createNamespace(topic);
+            var namespaceName = NamespaceService.getSLAMonitorNamespace(pulsarServices[i].getBrokerId(),
+                    pulsarServices[i].getConfig());
+            pulsarAdmins[0].namespaces().createNamespace(namespaceName.toString());
         }
     }
 
@@ -173,9 +173,9 @@ public class SLAMonitoringTest {
     public void testOwnershipViaAdminAfterSetup() {
         for (int i = 0; i < BROKER_COUNT; i++) {
             try {
-                String topic = String.format("persistent://%s/%s/%s:%s/%s",
-                        NamespaceService.SLA_NAMESPACE_PROPERTY, "my-cluster", pulsarServices[i].getAdvertisedAddress(),
-                        brokerWebServicePorts[i], "my-topic");
+                String topic = String.format("persistent://%s/%s/%s/%s",
+                        NamespaceService.SLA_NAMESPACE_PROPERTY, "my-cluster",
+                        pulsarServices[i].getBrokerId(), "my-topic");
                 assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic),
                         "pulsar://" + pulsarServices[i].getAdvertisedAddress() + ":" + brokerNativeBrokerPorts[i]);
             } catch (Exception e) {
@@ -199,8 +199,8 @@ public class SLAMonitoringTest {
             fail("Should be a able to close the broker index " + crashIndex + " Exception: " + e);
         }
 
-        String topic = String.format("persistent://%s/%s/%s:%s/%s", NamespaceService.SLA_NAMESPACE_PROPERTY,
-                "my-cluster", pulsarServices[crashIndex].getAdvertisedAddress(), brokerWebServicePorts[crashIndex],
+        String topic = String.format("persistent://%s/%s/%s/%s", NamespaceService.SLA_NAMESPACE_PROPERTY,
+                "my-cluster", pulsarServices[crashIndex].getBrokerId(),
                 "my-topic");
 
         log.info("Lookup for namespace {}", topic);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java
index de4cf9658b2..7c9154a27ff 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java
@@ -81,9 +81,8 @@ public class AdminApiMultiBrokersTest extends MultiBrokerBaseTest {
         assertTrue(leaderBroker.isPresent());
         log.info("Leader broker is {}", leaderBroker);
         for (PulsarAdmin admin : getAllAdmins()) {
-            String serviceUrl = admin.brokers().getLeaderBroker().getServiceUrl();
-            log.info("Pulsar admin get leader broker is {}", serviceUrl);
-            assertEquals(leaderBroker.get().getServiceUrl(), serviceUrl);
+            String brokerId = admin.brokers().getLeaderBroker().getBrokerId();
+            assertEquals(leaderBroker.get().getBrokerId(), brokerId);
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 0df37835670..93cf369f7dd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -333,10 +333,12 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         } catch (PulsarAdminException e) {
             assertTrue(e instanceof PreconditionFailedException);
         }
+
+        restartBroker();
     }
 
     @Test
-    public void clusterNamespaceIsolationPolicies() throws PulsarAdminException {
+    public void clusterNamespaceIsolationPolicies() throws Exception {
         try {
             // create
             String policyName1 = "policy-1";
@@ -512,6 +514,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
             // Ok
         }
 
+        restartBroker();
     }
 
     @Test
@@ -529,7 +532,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         Assert.assertEquals(list2.size(), 1);
 
         BrokerInfo leaderBroker = admin.brokers().getLeaderBroker();
-        Assert.assertEquals(leaderBroker.getServiceUrl(), pulsar.getLeaderElectionService().getCurrentLeader().map(LeaderBroker::getServiceUrl).get());
+        Assert.assertEquals(leaderBroker.getBrokerId(),
+                pulsar.getLeaderElectionService().getCurrentLeader().map(LeaderBroker::getBrokerId).get());
 
         Map<String, NamespaceOwnershipStatus> nsMap = admin.brokers().getOwnedNamespaces("test", list.get(0));
         // since sla-monitor ns is not created nsMap.size() == 1 (for HeartBeat Namespace)
@@ -537,7 +541,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         for (String ns : nsMap.keySet()) {
             NamespaceOwnershipStatus nsStatus = nsMap.get(ns);
             if (ns.equals(
-                    NamespaceService.getHeartbeatNamespace(pulsar.getLookupServiceAddress(), pulsar.getConfiguration())
+                    NamespaceService.getHeartbeatNamespace(pulsar.getBrokerId(), pulsar.getConfiguration())
                             + "/0x00000000_0xffffffff")) {
                 assertEquals(nsStatus.broker_assignment, BrokerAssignment.shared);
                 assertFalse(nsStatus.is_controlled);
@@ -703,6 +707,10 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         Awaitility.await().until(() -> pulsar.getConfiguration().getBrokerShutdownTimeoutMs() == newValue);
         // verify value is updated
         assertEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), newValue);
+        // reset config
+        pulsar.getConfiguration().setBrokerShutdownTimeoutMs(0L);
+        // restart broker
+        restartBroker();
     }
 
     /**
@@ -801,6 +809,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"),
                 Set.of("test", "usw"));
         admin.tenants().updateTenant("prop-xyz", tenantInfo);
+        Awaitility.await().untilAsserted(() ->
+            assertEquals(admin.tenants().getTenantInfo("prop-xyz").getAllowedClusters(), Set.of("test", "usw")));
 
         assertEquals(admin.namespaces().getPolicies("prop-xyz/ns1").bundles, PoliciesUtil.defaultBundle());
 
@@ -3191,6 +3201,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"),
                 Set.of("test", "usw"));
         admin.tenants().updateTenant("prop-xyz", tenantInfo);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin.tenants().getTenantInfo("prop-xyz").getAllowedClusters(),
+                        tenantInfo.getAllowedClusters()));
         admin.namespaces().createNamespace("prop-xyz/getBundleNs", 100);
         assertEquals(admin.namespaces().getPolicies("prop-xyz/getBundleNs").bundles.getNumBundles(), 100);
 
@@ -3384,6 +3397,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"),
                 Set.of("test", "usw"));
         admin.tenants().updateTenant("prop-xyz", tenantInfo);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin.tenants().getTenantInfo("prop-xyz").getAllowedClusters(),
+                        tenantInfo.getAllowedClusters()));
 
         String ns = BrokerTestUtil.newUniqueName("prop-xyz/ns");
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 3caeb591bc8..8a83682c1d2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -726,7 +726,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         Object leaderBrokerRes = asyncRequests(ctx -> brokers.getLeaderBroker(ctx));
         assertTrue(leaderBrokerRes instanceof BrokerInfo);
         BrokerInfo leaderBroker = (BrokerInfo)leaderBrokerRes;
-        assertEquals(leaderBroker.getServiceUrl(), pulsar.getLeaderElectionService().getCurrentLeader().map(LeaderBroker::getServiceUrl).get());
+        assertEquals(leaderBroker.getBrokerId(),
+                pulsar.getLeaderElectionService().getCurrentLeader().map(LeaderBroker::getBrokerId).get());
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index d9f2d41a30b..34bc1fa9a6a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -452,7 +452,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
         for (String ns : nsMap.keySet()) {
             NamespaceOwnershipStatus nsStatus = nsMap.get(ns);
             if (ns.equals(
-                    NamespaceService.getHeartbeatNamespace(pulsar.getLookupServiceAddress(), pulsar.getConfiguration())
+                    NamespaceService.getHeartbeatNamespace(pulsar.getBrokerId(), pulsar.getConfiguration())
                             + "/0x00000000_0xffffffff")) {
                 assertEquals(nsStatus.broker_assignment, BrokerAssignment.shared);
                 assertFalse(nsStatus.is_controlled);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersMultiBrokerLeaderElectionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersMultiBrokerLeaderElectionTest.java
new file mode 100644
index 00000000000..5adc78b2c52
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersMultiBrokerLeaderElectionTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance;
+
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class AdvertisedListenersMultiBrokerLeaderElectionTest extends MultiBrokerLeaderElectionTest {
+    @Override
+    protected PulsarTestContext.Builder createPulsarTestContextBuilder(ServiceConfiguration conf) {
+        conf.setWebServicePortTls(Optional.of(0));
+        return super.createPulsarTestContextBuilder(conf).preallocatePorts(true).configOverride(config -> {
+            // use advertised address that is different than the name used in the advertised listeners
+            config.setAdvertisedAddress("localhost");
+            config.setAdvertisedListeners(
+                    "public_pulsar:pulsar://127.0.0.1:" + config.getBrokerServicePort().get()
+                            + ",public_http:http://127.0.0.1:" + config.getWebServicePort().get()
+                            + ",public_https:https://127.0.0.1:" + config.getWebServicePortTls().get());
+        });
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
index 008897136f8..ded4ee8e58d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
@@ -115,7 +115,8 @@ public class LeaderElectionServiceTest {
         checkLookupException(tenant, namespace, client);
 
         // broker, webService and leaderElectionService is started, and elect is done;
-        leaderBrokerReference.set(new LeaderBroker(pulsar.getWebServiceAddress()));
+        leaderBrokerReference.set(
+                new LeaderBroker(pulsar.getBrokerId(), pulsar.getSafeWebServiceAddress()));
 
         Producer<byte[]> producer = client.newProducer()
                 .topic("persistent://" + tenant + "/" + namespace + "/1p")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index 50afb71ea09..e4a66b1201c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -133,7 +133,7 @@ public class LoadBalancerTest {
             brokerNativeBrokerPorts[i] = pulsarServices[i].getBrokerListenPort().get();
 
             brokerUrls[i] = new URL("http://127.0.0.1" + ":" + brokerWebServicePorts[i]);
-            lookupAddresses[i] = pulsarServices[i].getAdvertisedAddress() + ":" + pulsarServices[i].getListenPortHTTP().get();
+            lookupAddresses[i] = pulsarServices[i].getBrokerId();
             pulsarAdmins[i] = PulsarAdmin.builder().serviceHttpUrl(brokerUrls[i].toString()).build();
         }
 
@@ -401,7 +401,7 @@ public class LoadBalancerTest {
         double expectedMaxVariation = 10.0;
         for (int i = 0; i < BROKER_COUNT; i++) {
             long actualValue = 0;
-            String resourceId = "http://" + lookupAddresses[i];
+            String resourceId = lookupAddresses[i];
             if (namespaceOwner.containsKey(resourceId)) {
                 actualValue = namespaceOwner.get(resourceId);
             }
@@ -681,7 +681,7 @@ public class LoadBalancerTest {
                 }
             }
             // Make sure all brokers see the same leader
-            log.info("Old leader is : {}", oldLeader.getServiceUrl());
+            log.info("Old leader is : {}", oldLeader.getBrokerId());
             for (PulsarService pulsar : activePulsar) {
                 log.info("Current leader for {} is : {}", pulsar.getWebServiceAddress(), pulsar.getLeaderElectionService().getCurrentLeader());
                 assertEquals(pulsar.getLeaderElectionService().readCurrentLeader().join(), Optional.of(oldLeader));
@@ -691,7 +691,7 @@ public class LoadBalancerTest {
             leaderPulsar.close();
             loopUntilLeaderChangesForAllBroker(followerPulsar, oldLeader);
             LeaderBroker newLeader = followerPulsar.get(0).getLeaderElectionService().readCurrentLeader().join().get();
-            log.info("New leader is : {}", newLeader.getServiceUrl());
+            log.info("New leader is : {}", newLeader.getBrokerId());
             Assert.assertNotEquals(newLeader, oldLeader);
         }
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
index 5c840129dd8..a7eaffc147b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.loadbalance;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -36,14 +37,24 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.MultiBrokerTestZKBaseTest;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.admin.Lookup;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
 import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
 @Slf4j
 @Test(groups = "broker")
 public class MultiBrokerLeaderElectionTest extends MultiBrokerTestZKBaseTest {
+    public MultiBrokerLeaderElectionTest() {
+        super();
+        this.isTcpLookup = true;
+    }
+
     @Override
     protected int numberOfAdditionalBrokers() {
         return 9;
@@ -88,39 +99,80 @@ public class MultiBrokerLeaderElectionTest extends MultiBrokerTestZKBaseTest {
         });
     }
 
-    @Test
-    public void shouldProvideConsistentAnswerToTopicLookups()
+    @Test(timeOut = 60000L)
+    public void shouldProvideConsistentAnswerToTopicLookupsUsingAdminApi()
             throws PulsarAdminException, ExecutionException, InterruptedException {
-        String topicNameBase = "persistent://public/default/lookuptest" + UUID.randomUUID() + "-";
+        String namespace = "public/ns" + UUID.randomUUID();
+        admin.namespaces().createNamespace(namespace, 256);
+        String topicNameBase = "persistent://" + namespace + "/lookuptest-";
         List<String> topicNames = IntStream.range(0, 500).mapToObj(i -> topicNameBase + i)
                 .collect(Collectors.toList());
         List<PulsarAdmin> allAdmins = getAllAdmins();
         @Cleanup("shutdown")
         ExecutorService executorService = Executors.newFixedThreadPool(allAdmins.size());
         List<Future<List<String>>> resultFutures = new ArrayList<>();
-        String leaderBrokerUrl = admin.brokers().getLeaderBroker().getServiceUrl();
-        log.info("LEADER is {}", leaderBrokerUrl);
         // use Phaser to increase the chances of a race condition by triggering all threads once
-        // they are waiting just before the lookupTopic call
+        // they are waiting just before each lookupTopic call
         final Phaser phaser = new Phaser(1);
         for (PulsarAdmin brokerAdmin : allAdmins) {
-            if (!leaderBrokerUrl.equals(brokerAdmin.getServiceUrl())) {
-                phaser.register();
-                log.info("Doing lookup to broker {}", brokerAdmin.getServiceUrl());
-                resultFutures.add(executorService.submit(() -> {
-                    phaser.arriveAndAwaitAdvance();
-                    return topicNames.stream().map(topicName -> {
-                        try {
-                            return brokerAdmin.lookups().lookupTopic(topicName);
-                        } catch (PulsarAdminException e) {
-                            log.error("Error looking up topic {} in {}", topicName, brokerAdmin.getServiceUrl());
-                            throw new RuntimeException(e);
-                        }
-                    }).collect(Collectors.toList());
-                }));
+            phaser.register();
+            Lookup lookups = brokerAdmin.lookups();
+            log.info("Doing lookup to broker {}", brokerAdmin.getServiceUrl());
+            resultFutures.add(executorService.submit(() -> topicNames.stream().map(topicName -> {
+                phaser.arriveAndAwaitAdvance();
+                try {
+                    return lookups.lookupTopic(topicName);
+                } catch (PulsarAdminException e) {
+                    log.error("Error looking up topic {} in {}", topicName, brokerAdmin.getServiceUrl());
+                    throw new RuntimeException(e);
+                }
+            }).collect(Collectors.toList())));
+        }
+        phaser.arriveAndDeregister();
+        List<String> firstResult = null;
+        for (Future<List<String>> resultFuture : resultFutures) {
+            List<String> result = resultFuture.get();
+            if (firstResult == null) {
+                firstResult = result;
+            } else {
+                assertEquals(result, firstResult, "The lookup results weren't consistent.");
             }
         }
-        phaser.arriveAndAwaitAdvance();
+    }
+
+    @Test(timeOut = 60000L)
+    public void shouldProvideConsistentAnswerToTopicLookupsUsingClient()
+            throws PulsarAdminException, ExecutionException, InterruptedException {
+        String namespace = "public/ns" + UUID.randomUUID();
+        admin.namespaces().createNamespace(namespace, 256);
+        String topicNameBase = "persistent://" + namespace + "/lookuptest-";
+        List<String> topicNames = IntStream.range(0, 500).mapToObj(i -> topicNameBase + i)
+                .collect(Collectors.toList());
+        List<PulsarClient> allClients = getAllClients();
+        @Cleanup("shutdown")
+        ExecutorService executorService = Executors.newFixedThreadPool(allClients.size());
+        List<Future<List<String>>> resultFutures = new ArrayList<>();
+        // use Phaser to increase the chances of a race condition by triggering all threads once
+        // they are waiting just before each lookupTopic call
+        final Phaser phaser = new Phaser(1);
+        for (PulsarClient brokerClient : allClients) {
+            phaser.register();
+            String serviceUrl = ((PulsarClientImpl) brokerClient).getConfiguration().getServiceUrl();
+            LookupService lookupService = ((PulsarClientImpl) brokerClient).getLookup();
+            log.info("Doing lookup to broker {}", serviceUrl);
+            resultFutures.add(executorService.submit(() -> topicNames.stream().map(topicName -> {
+                phaser.arriveAndAwaitAdvance();
+                try {
+                    InetSocketAddress logicalAddress =
+                            lookupService.getBroker(TopicName.get(topicName)).get().getLogicalAddress();
+                    return logicalAddress.getHostString() + ":" + logicalAddress.getPort();
+                } catch (InterruptedException | ExecutionException e) {
+                    log.error("Error looking up topic {} in {}", topicName, serviceUrl);
+                    throw new RuntimeException(e);
+                }
+            }).collect(Collectors.toList())));
+        }
+        phaser.arriveAndDeregister();
         List<String> firstResult = null;
         for (Future<List<String>> resultFuture : resultFutures) {
             List<String> result = resultFuture.get();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 43706129fbe..f6154e3ec8e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -24,7 +24,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -213,7 +213,7 @@ public class SimpleLoadManagerImplTest {
         rd.put("bandwidthIn", new ResourceUsage(250 * 1024, 1024 * 1024));
         rd.put("bandwidthOut", new ResourceUsage(550 * 1024, 1024 * 1024));
 
-        ResourceUnit ru1 = new SimpleResourceUnit("http://prod2-broker7.messaging.usw.example.com:8080", rd);
+        ResourceUnit ru1 = new SimpleResourceUnit("prod2-broker7.messaging.usw.example.com:8080", rd);
         Set<ResourceUnit> rus = new HashSet<>();
         rus.add(ru1);
         LoadRanker lr = new ResourceAvailabilityRanker();
@@ -249,15 +249,15 @@ public class SimpleLoadManagerImplTest {
         rd.put("bandwidthIn", new ResourceUsage(250 * 1024, 1024 * 1024));
         rd.put("bandwidthOut", new ResourceUsage(550 * 1024, 1024 * 1024));
 
-        ResourceUnit ru1 = new SimpleResourceUnit(
-                "http://" + pulsar1.getAdvertisedAddress() + ":" + pulsar1.getConfiguration().getWebServicePort().get(), rd);
+        ResourceUnit ru1 = new SimpleResourceUnit(pulsar1.getBrokerId(), rd);
         Set<ResourceUnit> rus = new HashSet<>();
         rus.add(ru1);
         LoadRanker lr = new ResourceAvailabilityRanker();
 
         // inject the load report and rankings
         Map<ResourceUnit, org.apache.pulsar.policies.data.loadbalancer.LoadReport> loadReports = new HashMap<>();
-        org.apache.pulsar.policies.data.loadbalancer.LoadReport loadReport = new org.apache.pulsar.policies.data.loadbalancer.LoadReport();
+        org.apache.pulsar.policies.data.loadbalancer.LoadReport loadReport =
+                new org.apache.pulsar.policies.data.loadbalancer.LoadReport();
         loadReport.setSystemResourceUsage(new SystemResourceUsage());
         loadReports.put(ru1, loadReport);
         setObjectField(SimpleLoadManagerImpl.class, loadManager, "currentLoadReports", loadReports);
@@ -272,10 +272,9 @@ public class SimpleLoadManagerImplTest {
         sortedRankingsInstance.get().put(lr.getRank(rd), rus);
         setObjectField(SimpleLoadManagerImpl.class, loadManager, "sortedRankings", sortedRankingsInstance);
 
-        final Optional<ResourceUnit> leastLoaded = loadManager.getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10"));
-        // broker is not active so found should be null
-        assertFalse(leastLoaded.isPresent());
-
+        ResourceUnit found = loadManager.getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10")).get();
+        // TODO: this test doesn't make sense. This was the original assertion.
+        assertNotEquals(found, null, "did not find a broker when expected one to be found");
     }
 
     @Test(enabled = false)
@@ -290,7 +289,7 @@ public class SimpleLoadManagerImplTest {
         rd.put("bandwidthIn", new ResourceUsage(250 * 1024, 1024 * 1024));
         rd.put("bandwidthOut", new ResourceUsage(550 * 1024, 1024 * 1024));
 
-        ResourceUnit ru1 = new SimpleResourceUnit("http://prod2-broker7.messaging.usw.example.com:8080", rd);
+        ResourceUnit ru1 = new SimpleResourceUnit("prod2-broker7.messaging.usw.example.com:8080", rd);
         Set<ResourceUnit> rus = new HashSet<>();
         rus.add(ru1);
         LoadRanker lr = new ResourceAvailabilityRanker();
@@ -360,8 +359,8 @@ public class SimpleLoadManagerImplTest {
         rd.put("bandwidthIn", new ResourceUsage(250 * 1024, 1024 * 1024));
         rd.put("bandwidthOut", new ResourceUsage(550 * 1024, 1024 * 1024));
 
-        ResourceUnit ru1 = new SimpleResourceUnit("http://pulsar-broker1.com:8080", rd);
-        ResourceUnit ru2 = new SimpleResourceUnit("http://pulsar-broker2.com:8080", rd);
+        ResourceUnit ru1 = new SimpleResourceUnit("pulsar-broker1.com:8080", rd);
+        ResourceUnit ru2 = new SimpleResourceUnit("pulsar-broker2.com:8080", rd);
         Set<ResourceUnit> rus = new HashSet<>();
         rus.add(ru1);
         rus.add(ru2);
@@ -414,22 +413,18 @@ public class SimpleLoadManagerImplTest {
         final SimpleLoadManagerImpl loadManager = (SimpleLoadManagerImpl) pulsar1.getLoadManager().get();
 
         for (final NamespaceBundle bundle : bundles) {
-            if (loadManager.getLeastLoaded(bundle).get().getResourceId().equals(getAddress(primaryTlsHost))) {
+            if (loadManager.getLeastLoaded(bundle).get().getResourceId().equals(pulsar1.getBrokerId())) {
                 ++numAssignedToPrimary;
             } else {
                 ++numAssignedToSecondary;
             }
             // Check that number of assigned bundles are equivalent when an even number have been assigned.
             if ((numAssignedToPrimary + numAssignedToSecondary) % 2 == 0) {
-                assert (numAssignedToPrimary == numAssignedToSecondary);
+                assertEquals(numAssignedToPrimary, numAssignedToSecondary);
             }
         }
     }
 
-    private static String getAddress(String url) {
-        return url.replaceAll("https", "http");
-    }
-
     @Test
     public void testNamespaceBundleStats() {
         NamespaceBundleStats nsb1 = new NamespaceBundleStats();
@@ -519,7 +514,8 @@ public class SimpleLoadManagerImplTest {
     }
 
     private void setupClusters() throws PulsarAdminException {
-        admin1.clusters().createCluster("use", ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress()).build());
+        admin1.clusters().createCluster("use", ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress())
+                .brokerServiceUrl(pulsar1.getBrokerServiceUrl()).build());
         TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("use"));
         defaultTenant = "prop-xyz";
         admin1.tenants().createTenant(defaultTenant, tenantInfo);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
index fca41837b9d..fdd1eb7272c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -75,7 +75,7 @@ public class BrokerRegistryTest {
     private LocalBookkeeperEnsemble bkEnsemble;
 
 
-    // Make sure the load manager don't register itself to `/loadbalance/brokers/{lookupServiceAddress}`
+    // Make sure the load manager don't register itself to `/loadbalance/brokers/{brokerId}`.
     public static class MockLoadManager implements LoadManager {
 
         @Override
@@ -291,7 +291,7 @@ public class BrokerRegistryTest {
         pulsar1.start();
         pulsar2.start();
 
-        doReturn(pulsar1.getLookupServiceAddress()).when(pulsar2).getLookupServiceAddress();
+        doReturn(pulsar1.getBrokerId()).when(pulsar2).getBrokerId();
         BrokerRegistryImpl brokerRegistry1 = createBrokerRegistryImpl(pulsar1);
         BrokerRegistryImpl brokerRegistry2 = createBrokerRegistryImpl(pulsar2);
         brokerRegistry1.start();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index bb7416ddc41..af150b44df8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -314,7 +314,7 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
             public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String, BrokerLookupData> brokers,
                                                                                 ServiceUnitId serviceUnit,
                                                                                 LoadManagerContext context) {
-                brokers.remove(pulsar1.getLookupServiceAddress());
+                brokers.remove(pulsar1.getBrokerId());
                 return CompletableFuture.completedFuture(brokers);
             }
 
@@ -399,10 +399,10 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
         });
 
 
-        String dstBrokerUrl = pulsar1.getLookupServiceAddress();
+        String dstBrokerUrl = pulsar1.getBrokerId();
         String dstBrokerServiceUrl;
         if (broker.equals(pulsar1.getBrokerServiceUrl())) {
-            dstBrokerUrl = pulsar2.getLookupServiceAddress();
+            dstBrokerUrl = pulsar2.getBrokerId();
             dstBrokerServiceUrl = pulsar2.getBrokerServiceUrl();
         } else {
             dstBrokerServiceUrl = pulsar1.getBrokerServiceUrl();
@@ -482,10 +482,10 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
             final String dstBrokerUrl;
             final String dstBrokerServiceUrl;
             if (broker.equals(pulsar1.getBrokerServiceUrl())) {
-                dstBrokerUrl = pulsar2.getLookupServiceAddress();
+                dstBrokerUrl = pulsar2.getBrokerId();
                 dstBrokerServiceUrl = pulsar2.getBrokerServiceUrl();
             } else {
-                dstBrokerUrl = pulsar1.getLookupServiceAddress();
+                dstBrokerUrl = pulsar1.getBrokerId();
                 dstBrokerServiceUrl = pulsar1.getBrokerServiceUrl();
             }
             checkOwnershipState(broker, bundle);
@@ -826,13 +826,13 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
         TopicName topicName = topicAndBundle.getLeft();
         NamespaceBundle bundle = topicAndBundle.getRight();
 
-        String lookupServiceAddress1 = pulsar1.getLookupServiceAddress();
+        String brokerId1 = pulsar1.getBrokerId();
         doReturn(List.of(new MockBrokerFilter() {
             @Override
             public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String, BrokerLookupData> brokers,
                                                                                 ServiceUnitId serviceUnit,
                                                                                 LoadManagerContext context) {
-                brokers.remove(lookupServiceAddress1);
+                brokers.remove(brokerId1);
                 return CompletableFuture.completedFuture(brokers);
             }
         },new MockBrokerFilter() {
@@ -904,12 +904,12 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
                     // Test lookup heartbeat namespace's topic
                     for (PulsarService pulsar : pulsarServices) {
                         assertLookupHeartbeatOwner(pulsarService,
-                                pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+                                pulsar.getBrokerId(), pulsar.getBrokerServiceUrl());
                     }
                     // Test lookup SLA namespace's topic
                     for (PulsarService pulsar : pulsarServices) {
                         assertLookupSLANamespaceOwner(pulsarService,
-                                pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+                                pulsar.getBrokerId(), pulsar.getBrokerServiceUrl());
                     }
                 }
 
@@ -966,12 +966,12 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
                         // Test lookup heartbeat namespace's topic
                         for (PulsarService pulsar : pulsarServices) {
                             assertLookupHeartbeatOwner(pulsarService,
-                                    pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+                                    pulsar.getBrokerId(), pulsar.getBrokerServiceUrl());
                         }
                         // Test lookup SLA namespace's topic
                         for (PulsarService pulsar : pulsarServices) {
                             assertLookupSLANamespaceOwner(pulsarService,
-                                    pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+                                    pulsar.getBrokerId(), pulsar.getBrokerServiceUrl());
                         }
                     }
                 }
@@ -979,25 +979,25 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
     }
 
     private void assertLookupHeartbeatOwner(PulsarService pulsar,
-                                            String lookupServiceAddress,
+                                            String brokerId,
                                             String expectedBrokerServiceUrl) throws Exception {
         NamespaceName heartbeatNamespaceV1 =
-                getHeartbeatNamespace(lookupServiceAddress, pulsar.getConfiguration());
+                getHeartbeatNamespace(brokerId, pulsar.getConfiguration());
 
         String heartbeatV1Topic = heartbeatNamespaceV1.getPersistentTopicName("test");
         assertEquals(pulsar.getAdminClient().lookups().lookupTopic(heartbeatV1Topic), expectedBrokerServiceUrl);
 
         NamespaceName heartbeatNamespaceV2 =
-                getHeartbeatNamespaceV2(lookupServiceAddress, pulsar.getConfiguration());
+                getHeartbeatNamespaceV2(brokerId, pulsar.getConfiguration());
 
         String heartbeatV2Topic = heartbeatNamespaceV2.getPersistentTopicName("test");
         assertEquals(pulsar.getAdminClient().lookups().lookupTopic(heartbeatV2Topic), expectedBrokerServiceUrl);
     }
 
     private void assertLookupSLANamespaceOwner(PulsarService pulsar,
-                                               String lookupServiceAddress,
+                                               String brokerId,
                                                String expectedBrokerServiceUrl) throws Exception {
-        NamespaceName slaMonitorNamespace = getSLAMonitorNamespace(lookupServiceAddress, pulsar.getConfiguration());
+        NamespaceName slaMonitorNamespace = getSLAMonitorNamespace(brokerId, pulsar.getConfiguration());
         String slaMonitorTopic = slaMonitorNamespace.getPersistentTopicName("test");
         String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
         log.info("Topic {} Lookup result: {}", slaMonitorTopic, result);
@@ -1338,7 +1338,7 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
             NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
             if (!pulsar3.getBrokerServiceUrl().equals(lookupResult1)) {
                 admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(),
-                        pulsar3.getLookupServiceAddress());
+                        pulsar3.getBrokerId());
                 lookupResult1 = pulsar2.getAdminClient().lookups().lookupTopic(topic);
             }
             String lookupResult2 = pulsar1.getAdminClient().lookups().lookupTopic(topic);
@@ -1405,20 +1405,20 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
     @Test(timeOut = 30 * 1000, priority = -1)
     public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws Exception {
         NamespaceName heartbeatNamespacePulsar1V1 =
-                getHeartbeatNamespace(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration());
+                getHeartbeatNamespace(pulsar1.getBrokerId(), pulsar1.getConfiguration());
         NamespaceName heartbeatNamespacePulsar1V2 =
-                NamespaceService.getHeartbeatNamespaceV2(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration());
+                NamespaceService.getHeartbeatNamespaceV2(pulsar1.getBrokerId(), pulsar1.getConfiguration());
 
         NamespaceName heartbeatNamespacePulsar2V1 =
-                getHeartbeatNamespace(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration());
+                getHeartbeatNamespace(pulsar2.getBrokerId(), pulsar2.getConfiguration());
         NamespaceName heartbeatNamespacePulsar2V2 =
-                NamespaceService.getHeartbeatNamespaceV2(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration());
+                NamespaceService.getHeartbeatNamespaceV2(pulsar2.getBrokerId(), pulsar2.getConfiguration());
 
         NamespaceName slaMonitorNamespacePulsar1 =
-                getSLAMonitorNamespace(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration());
+                getSLAMonitorNamespace(pulsar1.getBrokerId(), pulsar1.getConfiguration());
 
         NamespaceName slaMonitorNamespacePulsar2 =
-                getSLAMonitorNamespace(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration());
+                getSLAMonitorNamespace(pulsar2.getBrokerId(), pulsar2.getConfiguration());
 
         NamespaceBundle bundle1 = pulsar1.getNamespaceService().getNamespaceBundleFactory()
                 .getFullBundle(heartbeatNamespacePulsar1V1);
@@ -1448,9 +1448,9 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
         assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4));
         assertTrue(ownedServiceUnitsByPulsar2.contains(slaBundle2));
         Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar1 =
-                admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar1.getLookupServiceAddress());
+                admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar1.getBrokerId());
         Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar2 =
-                admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar2.getLookupServiceAddress());
+                admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar2.getBrokerId());
         assertTrue(ownedNamespacesByPulsar1.containsKey(bundle1.toString()));
         assertTrue(ownedNamespacesByPulsar1.containsKey(bundle2.toString()));
         assertTrue(ownedNamespacesByPulsar1.containsKey(slaBundle1.toString()));
@@ -1482,7 +1482,7 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
             assertTrue(ownedBundles.contains(bundle));
         });
         Map<String, NamespaceOwnershipStatus> ownedNamespaces =
-                admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar.getLookupServiceAddress());
+                admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar.getBrokerId());
         assertTrue(ownedNamespaces.containsKey(bundle.toString()));
         NamespaceOwnershipStatus status = ownedNamespaces.get(bundle.toString());
         assertTrue(status.is_active);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 7bd12d66704..f7816151a42 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -109,8 +109,8 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
     private PulsarService pulsar2;
     private ServiceUnitStateChannel channel1;
     private ServiceUnitStateChannel channel2;
-    private String lookupServiceAddress1;
-    private String lookupServiceAddress2;
+    private String brokerId1;
+    private String brokerId2;
     private String bundle;
     private String bundle1;
     private String bundle2;
@@ -158,10 +158,10 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
 
         channel2 = createChannel(pulsar2);
         channel2.start();
-        lookupServiceAddress1 = (String)
-                FieldUtils.readDeclaredField(channel1, "lookupServiceAddress", true);
-        lookupServiceAddress2 = (String)
-                FieldUtils.readDeclaredField(channel2, "lookupServiceAddress", true);
+        brokerId1 = (String)
+                FieldUtils.readDeclaredField(channel1, "brokerId", true);
+        brokerId2 = (String)
+                FieldUtils.readDeclaredField(channel2, "brokerId", true);
 
         bundle = "public/default/0x00000000_0xffffffff";
         bundle1 = "public/default/0x00000000_0xfffffff0";
@@ -221,7 +221,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         assertEquals(newChannelOwner1, newChannelOwner2);
         assertNotEquals(channelOwner1, newChannelOwner1);
 
-        if (newChannelOwner1.equals(Optional.of(lookupServiceAddress1))) {
+        if (newChannelOwner1.equals(Optional.of(brokerId1))) {
             assertTrue(channel1.isChannelOwnerAsync().get(2, TimeUnit.SECONDS));
             assertFalse(channel2.isChannelOwnerAsync().get(2, TimeUnit.SECONDS));
         } else {
@@ -306,7 +306,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
             }
         }
         try {
-            channel.publishAssignEventAsync(bundle, lookupServiceAddress1).get(2, TimeUnit.SECONDS);
+            channel.publishAssignEventAsync(bundle, brokerId1).get(2, TimeUnit.SECONDS);
         } catch (ExecutionException e) {
             if (e.getCause() instanceof IllegalStateException) {
                 errorCnt++;
@@ -314,7 +314,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         }
         try {
             channel.publishUnloadEventAsync(
-                    new Unload(lookupServiceAddress1, bundle, Optional.of(lookupServiceAddress2)))
+                    new Unload(brokerId1, bundle, Optional.of(brokerId2)))
                     .get(2, TimeUnit.SECONDS);
         } catch (ExecutionException e) {
             if (e.getCause() instanceof IllegalStateException) {
@@ -322,7 +322,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
             }
         }
         try {
-            Split split = new Split(bundle, lookupServiceAddress1, Map.of(
+            Split split = new Split(bundle, brokerId1, Map.of(
                     childBundle1Range, Optional.empty(), childBundle2Range, Optional.empty()));
             channel.publishSplitEventAsync(split)
                     .get(2, TimeUnit.SECONDS);
@@ -363,8 +363,8 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         assertTrue(owner1.get().isEmpty());
         assertTrue(owner2.get().isEmpty());
 
-        var assigned1 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
-        var assigned2 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2);
+        var assigned1 = channel1.publishAssignEventAsync(bundle, brokerId1);
+        var assigned2 = channel2.publishAssignEventAsync(bundle, brokerId2);
         assertNotNull(assigned1);
         assertNotNull(assigned2);
         waitUntilOwnerChanges(channel1, bundle, null);
@@ -373,8 +373,8 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         String assignedAddr2 = assigned2.get(5, TimeUnit.SECONDS);
 
         assertEquals(assignedAddr1, assignedAddr2);
-        assertTrue(assignedAddr1.equals(lookupServiceAddress1)
-                || assignedAddr1.equals(lookupServiceAddress2), assignedAddr1);
+        assertTrue(assignedAddr1.equals(brokerId1)
+                || assignedAddr1.equals(brokerId2), assignedAddr1);
 
         var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
         var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
@@ -415,13 +415,13 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         assertTrue(owner1.get().isEmpty());
         assertTrue(owner2.get().isEmpty());
 
-        var owner3 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
-        var owner4 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2);
+        var owner3 = channel1.publishAssignEventAsync(bundle, brokerId1);
+        var owner4 = channel2.publishAssignEventAsync(bundle, brokerId2);
         assertTrue(owner3.isCompletedExceptionally());
         assertNotNull(owner4);
         String ownerAddrOpt2 = owner4.get(5, TimeUnit.SECONDS);
-        assertEquals(ownerAddrOpt2, lookupServiceAddress2);
-        waitUntilNewOwner(channel1, bundle, lookupServiceAddress2);
+        assertEquals(ownerAddrOpt2, brokerId2);
+        waitUntilNewOwner(channel1, bundle, brokerId2);
         assertEquals(0, getOwnerRequests1.size());
         assertEquals(0, getOwnerRequests2.size());
 
@@ -439,25 +439,25 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         assertTrue(owner2.get().isEmpty());
 
 
-        channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
-        waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
-        waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
+        channel1.publishAssignEventAsync(bundle, brokerId1);
+        waitUntilNewOwner(channel1, bundle, brokerId1);
+        waitUntilNewOwner(channel2, bundle, brokerId1);
         var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
         var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
 
         assertEquals(ownerAddr1, ownerAddr2);
-        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
+        assertEquals(ownerAddr1, Optional.of(brokerId1));
 
-        Unload unload = new Unload(lookupServiceAddress1, bundle, Optional.of(lookupServiceAddress2));
+        Unload unload = new Unload(brokerId1, bundle, Optional.of(brokerId2));
         channel1.publishUnloadEventAsync(unload);
 
-        waitUntilNewOwner(channel1, bundle, lookupServiceAddress2);
-        waitUntilNewOwner(channel2, bundle, lookupServiceAddress2);
+        waitUntilNewOwner(channel1, bundle, brokerId2);
+        waitUntilNewOwner(channel2, bundle, brokerId2);
 
         ownerAddr1 = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS);
         ownerAddr2 = channel2.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS);
         assertEquals(ownerAddr1, ownerAddr2);
-        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress2));
+        assertEquals(ownerAddr1, Optional.of(brokerId2));
 
         validateHandlerCounters(channel1, 2, 0, 2, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
         validateHandlerCounters(channel2, 2, 0, 2, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
@@ -474,14 +474,14 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         assertEquals(0, getOwnerRequests1.size());
         assertEquals(0, getOwnerRequests2.size());
 
-        channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
-        waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
-        waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
+        channel1.publishAssignEventAsync(bundle, brokerId1);
+        waitUntilNewOwner(channel1, bundle, brokerId1);
+        waitUntilNewOwner(channel2, bundle, brokerId1);
         var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
         var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
 
         assertEquals(ownerAddr1, ownerAddr2);
-        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
+        assertEquals(ownerAddr1, Optional.of(brokerId1));
 
         var producer = (Producer<ServiceUnitStateData>) FieldUtils.readDeclaredField(channel1,
                 "producer", true);
@@ -497,7 +497,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
                 "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
         FieldUtils.writeDeclaredField(channel2,
                 "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
-        Unload unload = new Unload(lookupServiceAddress1, bundle, Optional.of(lookupServiceAddress2));
+        Unload unload = new Unload(brokerId1, bundle, Optional.of(brokerId2));
         channel1.publishUnloadEventAsync(unload);
         // channel1 is broken. the ownership transfer won't be complete.
         waitUntilState(channel1, bundle);
@@ -521,7 +521,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         assertEquals(0, getOwnerRequests2.size());
 
         // recovered, check the monitor update state : Assigned -> Owned
-        doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1)))
+        doReturn(CompletableFuture.completedFuture(Optional.of(brokerId1)))
                 .when(loadManager).selectAsync(any(), any());
         FieldUtils.writeDeclaredField(channel2, "producer", producer, true);
         FieldUtils.writeDeclaredField(channel1,
@@ -530,18 +530,18 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
                 "inFlightStateWaitingTimeInMillis", 1 , true);
 
         ((ServiceUnitStateChannelImpl) channel1).monitorOwnerships(
-                List.of(lookupServiceAddress1, lookupServiceAddress2));
+                List.of(brokerId1, brokerId2));
         ((ServiceUnitStateChannelImpl) channel2).monitorOwnerships(
-                List.of(lookupServiceAddress1, lookupServiceAddress2));
+                List.of(brokerId1, brokerId2));
 
 
-        waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
-        waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
+        waitUntilNewOwner(channel1, bundle, brokerId1);
+        waitUntilNewOwner(channel2, bundle, brokerId1);
         ownerAddr1 = channel1.getOwnerAsync(bundle).get();
         ownerAddr2 = channel2.getOwnerAsync(bundle).get();
 
         assertEquals(ownerAddr1, ownerAddr2);
-        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
+        assertEquals(ownerAddr1, Optional.of(brokerId1));
 
         var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
         validateMonitorCounters(leader,
@@ -562,13 +562,13 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
 
     @Test(priority = 6)
     public void splitAndRetryTest() throws Exception {
-        channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
-        waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
-        waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
+        channel1.publishAssignEventAsync(bundle, brokerId1);
+        waitUntilNewOwner(channel1, bundle, brokerId1);
+        waitUntilNewOwner(channel2, bundle, brokerId1);
         var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
         var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
-        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
-        assertEquals(ownerAddr2, Optional.of(lookupServiceAddress1));
+        assertEquals(ownerAddr1, Optional.of(brokerId1));
+        assertEquals(ownerAddr2, Optional.of(brokerId1));
         assertTrue(ownerAddr1.isPresent());
 
         NamespaceService namespaceService = pulsar1.getNamespaceService();
@@ -609,14 +609,14 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
 
 
 
-        waitUntilNewOwner(channel1, childBundle11, lookupServiceAddress1);
-        waitUntilNewOwner(channel1, childBundle12, lookupServiceAddress1);
-        waitUntilNewOwner(channel2, childBundle11, lookupServiceAddress1);
-        waitUntilNewOwner(channel2, childBundle12, lookupServiceAddress1);
-        assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(childBundle11).get());
-        assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(childBundle12).get());
-        assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(childBundle11).get());
-        assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(childBundle12).get());
+        waitUntilNewOwner(channel1, childBundle11, brokerId1);
+        waitUntilNewOwner(channel1, childBundle12, brokerId1);
+        waitUntilNewOwner(channel2, childBundle11, brokerId1);
+        waitUntilNewOwner(channel2, childBundle12, brokerId1);
+        assertEquals(Optional.of(brokerId1), channel1.getOwnerAsync(childBundle11).get());
+        assertEquals(Optional.of(brokerId1), channel1.getOwnerAsync(childBundle12).get());
+        assertEquals(Optional.of(brokerId1), channel2.getOwnerAsync(childBundle11).get());
+        assertEquals(Optional.of(brokerId1), channel2.getOwnerAsync(childBundle12).get());
 
 
         // try the monitor and check the monitor moves `Deleted` -> `Init`
@@ -631,9 +631,9 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
                 "stateTombstoneDelayTimeInMillis", 1, true);
 
         ((ServiceUnitStateChannelImpl) channel1).monitorOwnerships(
-                List.of(lookupServiceAddress1, lookupServiceAddress2));
+                List.of(brokerId1, brokerId2));
         ((ServiceUnitStateChannelImpl) channel2).monitorOwnerships(
-                List.of(lookupServiceAddress1, lookupServiceAddress2));
+                List.of(brokerId1, brokerId2));
         waitUntilState(channel1, bundle, Init);
         waitUntilState(channel2, bundle, Init);
 
@@ -727,7 +727,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         String leader = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
         String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
         assertEquals(leader, leader2);
-        if (leader.equals(lookupServiceAddress2)) {
+        if (leader.equals(brokerId2)) {
             leaderChannel = channel2;
             followerChannel = channel1;
             var tmp = followerCleanupJobsTmp;
@@ -743,12 +743,12 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
 
         var owner1 = channel1.getOwnerAsync(bundle1);
         var owner2 = channel2.getOwnerAsync(bundle2);
-        doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
+        doReturn(CompletableFuture.completedFuture(Optional.of(brokerId2)))
                 .when(loadManager).selectAsync(any(), any());
         assertTrue(owner1.get().isEmpty());
         assertTrue(owner2.get().isEmpty());
 
-        String broker = lookupServiceAddress1;
+        String broker = brokerId1;
         channel1.publishAssignEventAsync(bundle1, broker);
         channel2.publishAssignEventAsync(bundle2, broker);
 
@@ -758,9 +758,9 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         waitUntilNewOwner(channel2, bundle2, broker);
 
         // Verify to transfer the ownership to the other broker.
-        channel1.publishUnloadEventAsync(new Unload(broker, bundle1, Optional.of(lookupServiceAddress2)));
-        waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2);
-        waitUntilNewOwner(channel2, bundle1, lookupServiceAddress2);
+        channel1.publishUnloadEventAsync(new Unload(broker, bundle1, Optional.of(brokerId2)));
+        waitUntilNewOwner(channel1, bundle1, brokerId2);
+        waitUntilNewOwner(channel2, bundle1, brokerId2);
 
         // test stable metadata state
         leaderChannel.handleMetadataSessionEvent(SessionReestablished);
@@ -771,13 +771,13 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
                 System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true);
         leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
         followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
-        leaderChannel.handleBrokerRegistrationEvent(lookupServiceAddress2, NotificationType.Deleted);
-        followerChannel.handleBrokerRegistrationEvent(lookupServiceAddress2, NotificationType.Deleted);
+        leaderChannel.handleBrokerRegistrationEvent(brokerId2, NotificationType.Deleted);
+        followerChannel.handleBrokerRegistrationEvent(brokerId2, NotificationType.Deleted);
 
-        waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2);
-        waitUntilNewOwner(channel2, bundle1, lookupServiceAddress2);
-        waitUntilNewOwner(channel1, bundle2, lookupServiceAddress2);
-        waitUntilNewOwner(channel2, bundle2, lookupServiceAddress2);
+        waitUntilNewOwner(channel1, bundle1, brokerId2);
+        waitUntilNewOwner(channel2, bundle1, brokerId2);
+        waitUntilNewOwner(channel1, bundle2, brokerId2);
+        waitUntilNewOwner(channel2, bundle2, brokerId2);
 
         verify(leaderCleanupJobs, times(1)).computeIfAbsent(eq(broker), any());
         verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any());
@@ -798,8 +798,8 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
 
 
         // test jittery metadata state
-        channel1.publishUnloadEventAsync(new Unload(lookupServiceAddress2, bundle1, Optional.of(broker)));
-        channel1.publishUnloadEventAsync(new Unload(lookupServiceAddress2, bundle2, Optional.of(broker)));
+        channel1.publishUnloadEventAsync(new Unload(brokerId2, bundle1, Optional.of(broker)));
+        channel1.publishUnloadEventAsync(new Unload(brokerId2, bundle2, Optional.of(broker)));
         waitUntilNewOwner(channel1, bundle1, broker);
         waitUntilNewOwner(channel2, bundle1, broker);
         waitUntilNewOwner(channel1, bundle2, broker);
@@ -871,10 +871,10 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
                 1);
 
         // finally cleanup
-        waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2);
-        waitUntilNewOwner(channel2, bundle1, lookupServiceAddress2);
-        waitUntilNewOwner(channel1, bundle2, lookupServiceAddress2);
-        waitUntilNewOwner(channel2, bundle2, lookupServiceAddress2);
+        waitUntilNewOwner(channel1, bundle1, brokerId2);
+        waitUntilNewOwner(channel2, bundle1, brokerId2);
+        waitUntilNewOwner(channel1, bundle2, brokerId2);
+        waitUntilNewOwner(channel2, bundle2, brokerId2);
 
         verify(leaderCleanupJobs, times(3)).computeIfAbsent(eq(broker), any());
         verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any());
@@ -893,8 +893,8 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
                 1);
 
         // test unstable state
-        channel1.publishUnloadEventAsync(new Unload(lookupServiceAddress2, bundle1, Optional.of(broker)));
-        channel1.publishUnloadEventAsync(new Unload(lookupServiceAddress2, bundle2, Optional.of(broker)));
+        channel1.publishUnloadEventAsync(new Unload(brokerId2, bundle1, Optional.of(broker)));
+        channel1.publishUnloadEventAsync(new Unload(brokerId2, bundle2, Optional.of(broker)));
         waitUntilNewOwner(channel1, bundle1, broker);
         waitUntilNewOwner(channel2, bundle1, broker);
         waitUntilNewOwner(channel1, bundle2, broker);
@@ -938,17 +938,17 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         assertTrue(owner1.get().isEmpty());
         assertTrue(owner2.get().isEmpty());
 
-        var assigned1 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
+        var assigned1 = channel1.publishAssignEventAsync(bundle, brokerId1);
         assertNotNull(assigned1);
 
-        waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
-        waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
+        waitUntilNewOwner(channel1, bundle, brokerId1);
+        waitUntilNewOwner(channel2, bundle, brokerId1);
         String assignedAddr1 = assigned1.get(5, TimeUnit.SECONDS);
-        assertEquals(lookupServiceAddress1, assignedAddr1);
+        assertEquals(brokerId1, assignedAddr1);
 
         FieldUtils.writeDeclaredField(channel2,
                 "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
-        var assigned2 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2);
+        var assigned2 = channel2.publishAssignEventAsync(bundle, brokerId2);
         assertNotNull(assigned2);
         Exception ex = null;
         try {
@@ -957,8 +957,8 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
             ex = e;
         }
         assertNull(ex);
-        assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(bundle).get());
-        assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(bundle).get());
+        assertEquals(Optional.of(brokerId1), channel2.getOwnerAsync(bundle).get());
+        assertEquals(Optional.of(brokerId1), channel1.getOwnerAsync(bundle).get());
 
         var compactor = spy (pulsar1.getStrategicCompactor());
         Field strategicCompactorField = FieldUtils.getDeclaredField(PulsarService.class, "strategicCompactor", true);
@@ -968,7 +968,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
                 .pollInterval(200, TimeUnit.MILLISECONDS)
                 .atMost(140, TimeUnit.SECONDS)
                 .untilAsserted(() -> {
-                    channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
+                    channel1.publishAssignEventAsync(bundle, brokerId1);
                     verify(compactor, times(1))
                             .compact(eq(ServiceUnitStateChannelImpl.TOPIC), any());
                 });
@@ -980,7 +980,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
                 .pollInterval(200, TimeUnit.MILLISECONDS)
                 .atMost(5, TimeUnit.SECONDS)
                 .untilAsserted(() -> assertEquals(
-                        channel3.getOwnerAsync(bundle).get(), Optional.of(lookupServiceAddress1)));
+                        channel3.getOwnerAsync(bundle).get(), Optional.of(brokerId1)));
         channel3.close();
         FieldUtils.writeDeclaredField(channel2,
                 "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
@@ -1025,16 +1025,16 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
     public void unloadTest()
             throws ExecutionException, InterruptedException, IllegalAccessException {
 
-        channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
+        channel1.publishAssignEventAsync(bundle, brokerId1);
 
-        waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
-        waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
+        waitUntilNewOwner(channel1, bundle, brokerId1);
+        waitUntilNewOwner(channel2, bundle, brokerId1);
         var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
         var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
 
         assertEquals(ownerAddr1, ownerAddr2);
-        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
-        Unload unload = new Unload(lookupServiceAddress1, bundle, Optional.empty());
+        assertEquals(ownerAddr1, Optional.of(brokerId1));
+        Unload unload = new Unload(brokerId1, bundle, Optional.empty());
 
         channel1.publishUnloadEventAsync(unload);
 
@@ -1046,17 +1046,17 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         assertEquals(Optional.empty(), owner1.get());
         assertEquals(Optional.empty(), owner2.get());
 
-        channel2.publishAssignEventAsync(bundle, lookupServiceAddress2);
+        channel2.publishAssignEventAsync(bundle, brokerId2);
 
-        waitUntilNewOwner(channel1, bundle, lookupServiceAddress2);
-        waitUntilNewOwner(channel2, bundle, lookupServiceAddress2);
+        waitUntilNewOwner(channel1, bundle, brokerId2);
+        waitUntilNewOwner(channel2, bundle, brokerId2);
 
         ownerAddr1 = channel1.getOwnerAsync(bundle).get();
         ownerAddr2 = channel2.getOwnerAsync(bundle).get();
 
         assertEquals(ownerAddr1, ownerAddr2);
-        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress2));
-        Unload unload2 = new Unload(lookupServiceAddress2, bundle, Optional.empty());
+        assertEquals(ownerAddr1, Optional.of(brokerId2));
+        Unload unload2 = new Unload(brokerId2, bundle, Optional.empty());
 
         channel2.publishUnloadEventAsync(unload2);
 
@@ -1075,9 +1075,9 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
                 "stateTombstoneDelayTimeInMillis", 1, true);
 
         ((ServiceUnitStateChannelImpl) channel1).monitorOwnerships(
-                List.of(lookupServiceAddress1, lookupServiceAddress2));
+                List.of(brokerId1, brokerId2));
         ((ServiceUnitStateChannelImpl) channel2).monitorOwnerships(
-                List.of(lookupServiceAddress1, lookupServiceAddress2));
+                List.of(brokerId1, brokerId2));
         waitUntilState(channel1, bundle, Init);
         waitUntilState(channel2, bundle, Init);
 
@@ -1107,7 +1107,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
     public void assignTestWhenDestBrokerProducerFails()
             throws ExecutionException, InterruptedException, IllegalAccessException {
 
-        Unload unload = new Unload(lookupServiceAddress1, bundle, Optional.empty());
+        Unload unload = new Unload(brokerId1, bundle, Optional.empty());
 
         channel1.publishUnloadEventAsync(unload);
 
@@ -1131,9 +1131,9 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
                 "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
         FieldUtils.writeDeclaredField(channel2,
                 "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
-        doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
+        doReturn(CompletableFuture.completedFuture(Optional.of(brokerId2)))
                 .when(loadManager).selectAsync(any(), any());
-        channel1.publishAssignEventAsync(bundle, lookupServiceAddress2);
+        channel1.publishAssignEventAsync(bundle, brokerId2);
         // channel1 is broken. the assign won't be complete.
         waitUntilState(channel1, bundle);
         waitUntilState(channel2, bundle);
@@ -1157,18 +1157,18 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
                 "inFlightStateWaitingTimeInMillis", 1 , true);
 
         ((ServiceUnitStateChannelImpl) channel1).monitorOwnerships(
-                List.of(lookupServiceAddress1, lookupServiceAddress2));
+                List.of(brokerId1, brokerId2));
         ((ServiceUnitStateChannelImpl) channel2).monitorOwnerships(
-                List.of(lookupServiceAddress1, lookupServiceAddress2));
+                List.of(brokerId1, brokerId2));
 
 
-        waitUntilNewOwner(channel1, bundle, lookupServiceAddress2);
-        waitUntilNewOwner(channel2, bundle, lookupServiceAddress2);
+        waitUntilNewOwner(channel1, bundle, brokerId2);
+        waitUntilNewOwner(channel2, bundle, brokerId2);
         var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
         var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
 
         assertEquals(ownerAddr1, ownerAddr2);
-        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress2));
+        assertEquals(ownerAddr1, Optional.of(brokerId2));
 
         var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
         validateMonitorCounters(leader,
@@ -1192,20 +1192,20 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
             throws ExecutionException, InterruptedException, IllegalAccessException {
 
 
-        Unload unload = new Unload(lookupServiceAddress1, bundle, Optional.empty());
+        Unload unload = new Unload(brokerId1, bundle, Optional.empty());
 
         channel1.publishUnloadEventAsync(unload);
 
         waitUntilState(channel1, bundle, Free);
         waitUntilState(channel2, bundle, Free);
 
-        channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
+        channel1.publishAssignEventAsync(bundle, brokerId1);
 
         waitUntilState(channel1, bundle, Owned);
         waitUntilState(channel2, bundle, Owned);
 
-        assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(bundle).get().get());
-        assertEquals(lookupServiceAddress1, channel2.getOwnerAsync(bundle).get().get());
+        assertEquals(brokerId1, channel1.getOwnerAsync(bundle).get().get());
+        assertEquals(brokerId1, channel2.getOwnerAsync(bundle).get().get());
 
         var producer = (Producer<ServiceUnitStateData>) FieldUtils.readDeclaredField(channel1,
                 "producer", true);
@@ -1224,7 +1224,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         // Assert child bundle ownerships in the channels.
 
 
-        Split split = new Split(bundle, lookupServiceAddress1, Map.of(
+        Split split = new Split(bundle, brokerId1, Map.of(
                 childBundle1Range, Optional.empty(), childBundle2Range, Optional.empty()));
         channel2.publishSplitEventAsync(split);
         // channel1 is broken. the split won't be complete.
@@ -1271,13 +1271,13 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         assertFalse(owner1);
         assertFalse(owner2);
 
-        owner1 = channel1.isOwner(bundle, lookupServiceAddress2);
-        owner2 = channel2.isOwner(bundle, lookupServiceAddress1);
+        owner1 = channel1.isOwner(bundle, brokerId2);
+        owner2 = channel2.isOwner(bundle, brokerId1);
 
         assertFalse(owner1);
         assertFalse(owner2);
 
-        channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
+        channel1.publishAssignEventAsync(bundle, brokerId1);
         owner2 = channel2.isOwner(bundle);
         assertFalse(owner2);
 
@@ -1290,34 +1290,34 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         assertTrue(owner1);
         assertFalse(owner2);
 
-        owner1 = channel1.isOwner(bundle, lookupServiceAddress1);
-        owner2 = channel2.isOwner(bundle, lookupServiceAddress2);
+        owner1 = channel1.isOwner(bundle, brokerId1);
+        owner2 = channel2.isOwner(bundle, brokerId2);
 
         assertTrue(owner1);
         assertFalse(owner2);
 
-        owner1 = channel2.isOwner(bundle, lookupServiceAddress1);
-        owner2 = channel1.isOwner(bundle, lookupServiceAddress2);
+        owner1 = channel2.isOwner(bundle, brokerId1);
+        owner2 = channel1.isOwner(bundle, brokerId2);
 
         assertTrue(owner1);
         assertFalse(owner2);
 
-        overrideTableView(channel1, bundle, new ServiceUnitStateData(Assigning, lookupServiceAddress1, 1));
+        overrideTableView(channel1, bundle, new ServiceUnitStateData(Assigning, brokerId1, 1));
         assertFalse(channel1.isOwner(bundle));
 
-        overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned, lookupServiceAddress1, 1));
+        overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned, brokerId1, 1));
         assertTrue(channel1.isOwner(bundle));
 
-        overrideTableView(channel1, bundle, new ServiceUnitStateData(Releasing, null, lookupServiceAddress1, 1));
+        overrideTableView(channel1, bundle, new ServiceUnitStateData(Releasing, null, brokerId1, 1));
         assertFalse(channel1.isOwner(bundle));
 
-        overrideTableView(channel1, bundle, new ServiceUnitStateData(Splitting, null, lookupServiceAddress1, 1));
+        overrideTableView(channel1, bundle, new ServiceUnitStateData(Splitting, null, brokerId1, 1));
         assertTrue(channel1.isOwner(bundle));
 
-        overrideTableView(channel1, bundle, new ServiceUnitStateData(Free, null, lookupServiceAddress1, 1));
+        overrideTableView(channel1, bundle, new ServiceUnitStateData(Free, null, brokerId1, 1));
         assertFalse(channel1.isOwner(bundle));
 
-        overrideTableView(channel1, bundle, new ServiceUnitStateData(Deleted, null, lookupServiceAddress1, 1));
+        overrideTableView(channel1, bundle, new ServiceUnitStateData(Deleted, null, brokerId1, 1));
         assertFalse(channel1.isOwner(bundle));
 
         overrideTableView(channel1, bundle, null);
@@ -1326,13 +1326,13 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
 
     @Test(priority = 16)
     public void splitAndRetryFailureTest() throws Exception {
-        channel1.publishAssignEventAsync(bundle3, lookupServiceAddress1);
-        waitUntilNewOwner(channel1, bundle3, lookupServiceAddress1);
-        waitUntilNewOwner(channel2, bundle3, lookupServiceAddress1);
+        channel1.publishAssignEventAsync(bundle3, brokerId1);
+        waitUntilNewOwner(channel1, bundle3, brokerId1);
+        waitUntilNewOwner(channel2, bundle3, brokerId1);
         var ownerAddr1 = channel1.getOwnerAsync(bundle3).get();
         var ownerAddr2 = channel2.getOwnerAsync(bundle3).get();
-        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
-        assertEquals(ownerAddr2, Optional.of(lookupServiceAddress1));
+        assertEquals(ownerAddr1, Optional.of(brokerId1));
+        assertEquals(ownerAddr2, Optional.of(brokerId1));
         assertTrue(ownerAddr1.isPresent());
 
         NamespaceService namespaceService = pulsar1.getNamespaceService();
@@ -1373,7 +1373,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
                 });
         var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
         ((ServiceUnitStateChannelImpl) leader)
-                .monitorOwnerships(List.of(lookupServiceAddress1, lookupServiceAddress2));
+                .monitorOwnerships(List.of(brokerId1, brokerId2));
         waitUntilState(leader, bundle3, Deleted);
         waitUntilState(channel1, bundle3, Deleted);
         waitUntilState(channel2, bundle3, Deleted);
@@ -1384,14 +1384,14 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
         validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
 
-        waitUntilNewOwner(channel1, childBundle31, lookupServiceAddress1);
-        waitUntilNewOwner(channel1, childBundle32, lookupServiceAddress1);
-        waitUntilNewOwner(channel2, childBundle31, lookupServiceAddress1);
-        waitUntilNewOwner(channel2, childBundle32, lookupServiceAddress1);
-        assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(childBundle31).get());
-        assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(childBundle32).get());
-        assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(childBundle31).get());
-        assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(childBundle32).get());
+        waitUntilNewOwner(channel1, childBundle31, brokerId1);
+        waitUntilNewOwner(channel1, childBundle32, brokerId1);
+        waitUntilNewOwner(channel2, childBundle31, brokerId1);
+        waitUntilNewOwner(channel2, childBundle32, brokerId1);
+        assertEquals(Optional.of(brokerId1), channel1.getOwnerAsync(childBundle31).get());
+        assertEquals(Optional.of(brokerId1), channel1.getOwnerAsync(childBundle32).get());
+        assertEquals(Optional.of(brokerId1), channel2.getOwnerAsync(childBundle31).get());
+        assertEquals(Optional.of(brokerId1), channel2.getOwnerAsync(childBundle32).get());
 
 
         // try the monitor and check the monitor moves `Deleted` -> `Init`
@@ -1402,9 +1402,9 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
                 "stateTombstoneDelayTimeInMillis", 1, true);
 
         ((ServiceUnitStateChannelImpl) channel1).monitorOwnerships(
-                List.of(lookupServiceAddress1, lookupServiceAddress2));
+                List.of(brokerId1, brokerId2));
         ((ServiceUnitStateChannelImpl) channel2).monitorOwnerships(
-                List.of(lookupServiceAddress1, lookupServiceAddress2));
+                List.of(brokerId1, brokerId2));
         waitUntilState(channel1, bundle3, Init);
         waitUntilState(channel2, bundle3, Init);
 
@@ -1440,12 +1440,12 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         String leader = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
         String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
         assertEquals(leader, leader2);
-        if (leader.equals(lookupServiceAddress2)) {
+        if (leader.equals(brokerId2)) {
             leaderChannel = channel2;
             followerChannel = channel1;
         }
 
-        String broker = lookupServiceAddress1;
+        String broker = brokerId1;
 
         // test override states
         String releasingBundle = "public/releasing/0xfffffff0_0xffffffff";
@@ -1470,7 +1470,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
                 new ServiceUnitStateData(Owned, broker, null, 1));
 
         // test stable metadata state
-        doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
+        doReturn(CompletableFuture.completedFuture(Optional.of(brokerId2)))
                 .when(loadManager).selectAsync(any(), any());
         leaderChannel.handleMetadataSessionEvent(SessionReestablished);
         followerChannel.handleMetadataSessionEvent(SessionReestablished);
@@ -1481,11 +1481,11 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
         followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
 
-        waitUntilNewOwner(channel2, releasingBundle, lookupServiceAddress2);
-        waitUntilNewOwner(channel2, childBundle11, lookupServiceAddress2);
-        waitUntilNewOwner(channel2, childBundle12, lookupServiceAddress2);
-        waitUntilNewOwner(channel2, assigningBundle, lookupServiceAddress2);
-        waitUntilNewOwner(channel2, ownedBundle, lookupServiceAddress2);
+        waitUntilNewOwner(channel2, releasingBundle, brokerId2);
+        waitUntilNewOwner(channel2, childBundle11, brokerId2);
+        waitUntilNewOwner(channel2, childBundle12, brokerId2);
+        waitUntilNewOwner(channel2, assigningBundle, brokerId2);
+        waitUntilNewOwner(channel2, ownedBundle, brokerId2);
         assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get());
         assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
         assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());
@@ -1505,12 +1505,12 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         String leader = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
         String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
         assertEquals(leader, leader2);
-        if (leader.equals(lookupServiceAddress2)) {
+        if (leader.equals(brokerId2)) {
             leaderChannel = channel2;
             followerChannel = channel1;
         }
 
-        String broker = lookupServiceAddress1;
+        String broker = brokerId1;
 
         // test override states
         String releasingBundle = "public/releasing/0xfffffff0_0xffffffff";
@@ -1535,19 +1535,19 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
                 new ServiceUnitStateData(Owned, broker, null, 1));
 
         // test stable metadata state
-        doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
+        doReturn(CompletableFuture.completedFuture(Optional.of(brokerId2)))
                 .when(loadManager).selectAsync(any(), any());
         FieldUtils.writeDeclaredField(leaderChannel, "inFlightStateWaitingTimeInMillis",
                 -1, true);
         FieldUtils.writeDeclaredField(followerChannel, "inFlightStateWaitingTimeInMillis",
                 -1, true);
         ((ServiceUnitStateChannelImpl) leaderChannel)
-                .monitorOwnerships(List.of(lookupServiceAddress1, lookupServiceAddress2));
+                .monitorOwnerships(List.of(brokerId1, brokerId2));
 
         waitUntilNewOwner(channel2, releasingBundle, broker);
         waitUntilNewOwner(channel2, childBundle11, broker);
         waitUntilNewOwner(channel2, childBundle12, broker);
-        waitUntilNewOwner(channel2, assigningBundle, lookupServiceAddress2);
+        waitUntilNewOwner(channel2, assigningBundle, brokerId2);
         waitUntilNewOwner(channel2, ownedBundle, broker);
         assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get());
         assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
@@ -1566,7 +1566,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
 
 
         // set the bundle owner is the broker
-        String broker = lookupServiceAddress2;
+        String broker = brokerId2;
         String bundle = "public/owned/0xfffffff0_0xffffffff";
         overrideTableViews(bundle,
                 new ServiceUnitStateData(Owned, broker, null, 1));
@@ -1596,7 +1596,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         String leader1 = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
         String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
         assertEquals(leader1, leader2);
-        if (leader1.equals(lookupServiceAddress2)) {
+        if (leader1.equals(brokerId2)) {
             leaderChannel = channel2;
         }
         leaderChannel.handleMetadataSessionEvent(SessionReestablished);
@@ -1611,10 +1611,10 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty());
         assertTrue(System.currentTimeMillis() - start < 20_000);
 
-        // simulate ownership cleanup(lookupServiceAddress1 selected owner) by the leader channel
+        // simulate ownership cleanup(brokerId1 selected owner) by the leader channel
         overrideTableViews(bundle,
                 new ServiceUnitStateData(Owned, broker, null, 1));
-        doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1)))
+        doReturn(CompletableFuture.completedFuture(Optional.of(brokerId1)))
                 .when(loadManager).selectAsync(any(), any());
         leaderChannel.handleMetadataSessionEvent(SessionReestablished);
         FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp",
@@ -1622,9 +1622,9 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         getCleanupJobs(leaderChannel).clear();
         leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
 
-        // verify the ownership cleanup, and channel's getOwnerAsync returns lookupServiceAddress1 without timeout
+        // verify the ownership cleanup, and channel's getOwnerAsync returns brokerId1 without timeout
         start = System.currentTimeMillis();
-        assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(bundle).get().get());
+        assertEquals(brokerId1, channel1.getOwnerAsync(bundle).get().get());
         assertTrue(System.currentTimeMillis() - start < 20_000);
 
         // test clean-up
@@ -1746,7 +1746,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
                 .atMost(10, TimeUnit.SECONDS)
                 .until(() -> { // wait until true
                     ((ServiceUnitStateChannelImpl) channel)
-                            .monitorOwnerships(List.of(lookupServiceAddress1, lookupServiceAddress2));
+                            .monitorOwnerships(List.of(brokerId1, brokerId2));
                     ServiceUnitStateData data = tv.get(key);
                     ServiceUnitState actual = state(data);
                     return actual == expected;
@@ -1968,7 +1968,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
 
 
         var leaderElectionService = new LeaderElectionService(
-                pulsar.getCoordinationService(), pulsar.getSafeWebServiceAddress(),
+                pulsar.getCoordinationService(), pulsar.getBrokerId(), pulsar.getSafeWebServiceAddress(),
                 state -> {
                     if (state == LeaderElectionState.Leading) {
                         channel.scheduleOwnershipMonitor();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
index f45e1405e1d..87aaf4bac7f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
@@ -84,20 +84,20 @@ public class BrokerIsolationPoliciesFilterTest {
 
         // a. available-brokers: broker1, broker2, broker3 => result: broker1
         Map<String, BrokerLookupData> result = filter.filterAsync(new HashMap<>(Map.of(
-                "broker1", getLookupData(),
-                "broker2", getLookupData(),
-                "broker3", getLookupData())), namespaceName, getContext()).get();
-        assertEquals(result.keySet(), Set.of("broker1"));
+                "broker1:8080", getLookupData(),
+                "broker2:8080", getLookupData(),
+                "broker3:8080", getLookupData())), namespaceName, getContext()).get();
+        assertEquals(result.keySet(), Set.of("broker1:8080"));
 
         // b. available-brokers: broker2, broker3          => result: broker2
         result = filter.filterAsync(new HashMap<>(Map.of(
-                "broker2", getLookupData(),
-                "broker3", getLookupData())), namespaceName, getContext()).get();
-        assertEquals(result.keySet(), Set.of("broker2"));
+                "broker2:8080", getLookupData(),
+                "broker3:8080", getLookupData())), namespaceName, getContext()).get();
+        assertEquals(result.keySet(), Set.of("broker2:8080"));
 
         // c. available-brokers: broker3                   => result: NULL
         result = filter.filterAsync(new HashMap<>(Map.of(
-                "broker3", getLookupData())), namespaceName, getContext()).get();
+                "broker3:8080", getLookupData())), namespaceName, getContext()).get();
         assertTrue(result.isEmpty());
 
         // 2. Namespace: primary=broker1, secondary=broker2, shared=broker3, min_limit = 2
@@ -105,20 +105,20 @@ public class BrokerIsolationPoliciesFilterTest {
 
         // a. available-brokers: broker1, broker2, broker3 => result: broker1, broker2
         result = filter.filterAsync(new HashMap<>(Map.of(
-                "broker1", getLookupData(),
-                "broker2", getLookupData(),
-                "broker3", getLookupData())), namespaceName, getContext()).get();
-        assertEquals(result.keySet(), Set.of("broker1", "broker2"));
+                "broker1:8080", getLookupData(),
+                "broker2:8080", getLookupData(),
+                "broker3:8080", getLookupData())), namespaceName, getContext()).get();
+        assertEquals(result.keySet(), Set.of("broker1:8080", "broker2:8080"));
 
         // b. available-brokers: broker2, broker3          => result: broker2
         result = filter.filterAsync(new HashMap<>(Map.of(
-                "broker2", getLookupData(),
-                "broker3", getLookupData())), namespaceName, getContext()).get();
-        assertEquals(result.keySet(), Set.of("broker2"));
+                "broker2:8080", getLookupData(),
+                "broker3:8080", getLookupData())), namespaceName, getContext()).get();
+        assertEquals(result.keySet(), Set.of("broker2:8080"));
 
         // c. available-brokers: broker3                   => result: NULL
         result = filter.filterAsync(new HashMap<>(Map.of(
-                "broker3", getLookupData())), namespaceName, getContext()).get();
+                "broker3:8080", getLookupData())), namespaceName, getContext()).get();
         assertTrue(result.isEmpty());
     }
 
@@ -142,31 +142,31 @@ public class BrokerIsolationPoliciesFilterTest {
 
 
         Map<String, BrokerLookupData> result = filter.filterAsync(new HashMap<>(Map.of(
-                "broker1", getLookupData(),
-                "broker2", getLookupData(),
-                "broker3", getLookupData())), namespaceBundle, getContext()).get();
-        assertEquals(result.keySet(), Set.of("broker1", "broker2", "broker3"));
+                "broker1:8080", getLookupData(),
+                "broker2:8080", getLookupData(),
+                "broker3:8080", getLookupData())), namespaceBundle, getContext()).get();
+        assertEquals(result.keySet(), Set.of("broker1:8080", "broker2:8080", "broker3:8080"));
 
 
         result = filter.filterAsync(new HashMap<>(Map.of(
-                "broker1", getLookupData(true, false),
-                "broker2", getLookupData(true, false),
-                "broker3", getLookupData())), namespaceBundle, getContext()).get();
-        assertEquals(result.keySet(), Set.of("broker3"));
+                "broker1:8080", getLookupData(true, false),
+                "broker2:8080", getLookupData(true, false),
+                "broker3:8080", getLookupData())), namespaceBundle, getContext()).get();
+        assertEquals(result.keySet(), Set.of("broker3:8080"));
 
         doReturn(false).when(namespaceBundle).hasNonPersistentTopic();
 
         result = filter.filterAsync(new HashMap<>(Map.of(
-                "broker1", getLookupData(),
-                "broker2", getLookupData(),
-                "broker3", getLookupData())), namespaceBundle, getContext()).get();
-        assertEquals(result.keySet(), Set.of("broker1", "broker2", "broker3"));
+                "broker1:8080", getLookupData(),
+                "broker2:8080", getLookupData(),
+                "broker3:8080", getLookupData())), namespaceBundle, getContext()).get();
+        assertEquals(result.keySet(), Set.of("broker1:8080", "broker2:8080", "broker3:8080"));
 
         result = filter.filterAsync(new HashMap<>(Map.of(
-                "broker1", getLookupData(false, true),
-                "broker2", getLookupData(),
-                "broker3", getLookupData())), namespaceBundle, getContext()).get();
-        assertEquals(result.keySet(), Set.of("broker2", "broker3"));
+                "broker1:8080", getLookupData(false, true),
+                "broker2:8080", getLookupData(),
+                "broker3:8080", getLookupData())), namespaceBundle, getContext()).get();
+        assertEquals(result.keySet(), Set.of("broker2:8080", "broker3:8080"));
     }
 
     private void setIsolationPolicies(SimpleResourceAllocationPolicies policies,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index 4eec6124777..0ff64616973 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -100,6 +100,7 @@ import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+import org.assertj.core.api.Assertions;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -168,18 +169,18 @@ public class TransferShedderTest {
         var ctx = getContext();
 
         var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
-        topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("my-tenant/my-namespaceA", 1000000, 2000000));
-        topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("my-tenant/my-namespaceB", 1000000, 3000000));
-        topBundlesLoadDataStore.pushAsync("broker3", getTopBundlesLoad("my-tenant/my-namespaceC", 2000000, 4000000));
-        topBundlesLoadDataStore.pushAsync("broker4", getTopBundlesLoad("my-tenant/my-namespaceD", 2000000, 6000000));
-        topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("my-tenant/my-namespaceE", 2000000, 7000000));
+        topBundlesLoadDataStore.pushAsync("broker1:8080", getTopBundlesLoad("my-tenant/my-namespaceA", 1000000, 2000000));
+        topBundlesLoadDataStore.pushAsync("broker2:8080", getTopBundlesLoad("my-tenant/my-namespaceB", 1000000, 3000000));
+        topBundlesLoadDataStore.pushAsync("broker3:8080", getTopBundlesLoad("my-tenant/my-namespaceC", 2000000, 4000000));
+        topBundlesLoadDataStore.pushAsync("broker4:8080", getTopBundlesLoad("my-tenant/my-namespaceD", 2000000, 6000000));
+        topBundlesLoadDataStore.pushAsync("broker5:8080", getTopBundlesLoad("my-tenant/my-namespaceE", 2000000, 7000000));
 
         var brokerLoadDataStore = ctx.brokerLoadDataStore();
-        brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx, 2, "broker1"));
-        brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx, 4, "broker2"));
-        brokerLoadDataStore.pushAsync("broker3", getCpuLoad(ctx, 6, "broker3"));
-        brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx, 80, "broker4"));
-        brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx, 90, "broker5"));
+        brokerLoadDataStore.pushAsync("broker1:8080", getCpuLoad(ctx, 2, "broker1:8080"));
+        brokerLoadDataStore.pushAsync("broker2:8080", getCpuLoad(ctx, 4, "broker2:8080"));
+        brokerLoadDataStore.pushAsync("broker3:8080", getCpuLoad(ctx, 6, "broker3:8080"));
+        brokerLoadDataStore.pushAsync("broker4:8080", getCpuLoad(ctx, 80, "broker4:8080"));
+        brokerLoadDataStore.pushAsync("broker5:8080", getCpuLoad(ctx, 90, "broker5:8080"));
         return ctx;
     }
 
@@ -192,9 +193,9 @@ public class TransferShedderTest {
         Random rand = new Random();
         for (int i = 0; i < clusterSize; i++) {
             int brokerLoad = rand.nextInt(1000);
-            brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx,  brokerLoad, "broker" + i));
+            brokerLoadDataStore.pushAsync("broker" + i + ":8080", getCpuLoad(ctx,  brokerLoad, "broker" + i + ":8080"));
             int bundleLoad = rand.nextInt(brokerLoad + 1);
-            topBundlesLoadDataStore.pushAsync("broker" + i, getTopBundlesLoad("my-tenant/my-namespace" + i,
+            topBundlesLoadDataStore.pushAsync("broker" + i + ":8080", getTopBundlesLoad("my-tenant/my-namespace" + i,
                     bundleLoad, brokerLoad - bundleLoad));
         }
         return ctx;
@@ -209,14 +210,14 @@ public class TransferShedderTest {
         int i = 0;
         for (; i < clusterSize-1; i++) {
             int brokerLoad = 1;
-            topBundlesLoadDataStore.pushAsync("broker" + i, getTopBundlesLoad("my-tenant/my-namespace" + i,
+            topBundlesLoadDataStore.pushAsync("broker" + i + ":8080", getTopBundlesLoad("my-tenant/my-namespace" + i,
                     300_000, 700_000));
-            brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx,  brokerLoad, "broker" + i));
+            brokerLoadDataStore.pushAsync("broker" + i + ":8080", getCpuLoad(ctx,  brokerLoad, "broker" + i + ":8080"));
         }
         int brokerLoad = 100;
-        topBundlesLoadDataStore.pushAsync("broker" + i, getTopBundlesLoad("my-tenant/my-namespace" + i,
+        topBundlesLoadDataStore.pushAsync("broker" + i + ":8080", getTopBundlesLoad("my-tenant/my-namespace" + i,
                 30_000_000, 70_000_000));
-        brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx,  brokerLoad, "broker" + i));
+        brokerLoadDataStore.pushAsync("broker" + i + ":8080", getCpuLoad(ctx,  brokerLoad, "broker" + i + ":8080"));
 
         return ctx;
     }
@@ -230,21 +231,21 @@ public class TransferShedderTest {
         int i = 0;
         for (; i < clusterSize-2; i++) {
             int brokerLoad = 98;
-            topBundlesLoadDataStore.pushAsync("broker" + i, getTopBundlesLoad("my-tenant/my-namespace" + i,
+            topBundlesLoadDataStore.pushAsync("broker" + i + ":8080", getTopBundlesLoad("my-tenant/my-namespace" + i,
                     30_000_000, 70_000_000));
-            brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx,  brokerLoad, "broker" + i));
+            brokerLoadDataStore.pushAsync("broker" + i + ":8080", getCpuLoad(ctx,  brokerLoad, "broker" + i + ":8080"));
         }
 
         int brokerLoad = 99;
-        topBundlesLoadDataStore.pushAsync("broker" + i, getTopBundlesLoad("my-tenant/my-namespace" + i,
+        topBundlesLoadDataStore.pushAsync("broker" + i + ":8080", getTopBundlesLoad("my-tenant/my-namespace" + i,
                 30_000_000, 70_000_000));
-        brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx,  brokerLoad, "broker" + i));
+        brokerLoadDataStore.pushAsync("broker" + i + ":8080", getCpuLoad(ctx,  brokerLoad, "broker" + i + ":8080"));
         i++;
 
         brokerLoad = 1;
-        topBundlesLoadDataStore.pushAsync("broker" + i, getTopBundlesLoad("my-tenant/my-namespace" + i,
+        topBundlesLoadDataStore.pushAsync("broker" + i + ":8080", getTopBundlesLoad("my-tenant/my-namespace" + i,
                 300_000, 700_000));
-        brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx,  brokerLoad, "broker" + i));
+        brokerLoadDataStore.pushAsync("broker" + i + ":8080", getCpuLoad(ctx,  brokerLoad, "broker" + i + ":8080"));
         return ctx;
     }
 
@@ -474,11 +475,11 @@ public class TransferShedderTest {
 
         BrokerRegistry brokerRegistry = mock(BrokerRegistry.class);
         doReturn(CompletableFuture.completedFuture(Map.of(
-                "broker1", getMockBrokerLookupData(),
-                "broker2", getMockBrokerLookupData(),
-                "broker3", getMockBrokerLookupData(),
-                "broker4", getMockBrokerLookupData(),
-                "broker5", getMockBrokerLookupData()
+                "broker1:8080", getMockBrokerLookupData(),
+                "broker2:8080", getMockBrokerLookupData(),
+                "broker3:8080", getMockBrokerLookupData(),
+                "broker4:8080", getMockBrokerLookupData(),
+                "broker5:8080", getMockBrokerLookupData()
         ))).when(brokerRegistry).getAvailableBrokerLookupDataAsync();
         doReturn(conf).when(ctx).brokerConfiguration();
         doReturn(brokerLoadDataStore).when(ctx).brokerLoadDataStore();
@@ -526,11 +527,11 @@ public class TransferShedderTest {
         var ctx = getContext();
         var brokerLoadDataStore = ctx.brokerLoadDataStore();
 
-        brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx, 2, "broker1"));
-        brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx, 4, "broker2"));
-        brokerLoadDataStore.pushAsync("broker3", getCpuLoad(ctx, 6, "broker3"));
-        brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx, 80, "broker4"));
-        brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx, 90, "broker5"));
+        brokerLoadDataStore.pushAsync("broker1:8080", getCpuLoad(ctx, 2, "broker1:8080"));
+        brokerLoadDataStore.pushAsync("broker2:8080", getCpuLoad(ctx, 4, "broker2:8080"));
+        brokerLoadDataStore.pushAsync("broker3:8080", getCpuLoad(ctx, 6, "broker3:8080"));
+        brokerLoadDataStore.pushAsync("broker4:8080", getCpuLoad(ctx, 80, "broker4:8080"));
+        brokerLoadDataStore.pushAsync("broker5:8080", getCpuLoad(ctx, 90, "broker5:8080"));
 
 
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
@@ -550,11 +551,11 @@ public class TransferShedderTest {
         assertEquals(res.size(), 2);
 
 
-        FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker1").get(), "updatedAt", 0, true);
-        FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker2").get(), "updatedAt", 0, true);
-        FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker3").get(), "updatedAt", 0, true);
-        FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker4").get(), "updatedAt", 0, true);
-        FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker5").get(), "updatedAt", 0, true);
+        FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker1:8080").get(), "updatedAt", 0, true);
+        FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker2:8080").get(), "updatedAt", 0, true);
+        FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker3:8080").get(), "updatedAt", 0, true);
+        FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker4:8080").get(), "updatedAt", 0, true);
+        FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker5:8080").get(), "updatedAt", 0, true);
 
 
         res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
@@ -571,20 +572,20 @@ public class TransferShedderTest {
         Map<String, Long> recentlyUnloadedBrokers = new HashMap<>();
         var oldTS = System.currentTimeMillis() - ctx.brokerConfiguration()
                 .getLoadBalancerBrokerLoadDataTTLInSeconds() * 1001;
-        recentlyUnloadedBrokers.put("broker1", oldTS);
+        recentlyUnloadedBrokers.put("broker1:8080", oldTS);
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), recentlyUnloadedBrokers);
 
         var expected = new HashSet<UnloadDecision>();
-        expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")),
+        expected.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
                 Success, Overloaded));
-        expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker2")),
+        expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
                 Success, Overloaded));
         assertEquals(res, expected);
         assertEquals(counter.getLoadAvg(), setupLoadAvg);
         assertEquals(counter.getLoadStd(), setupLoadStd);
 
         var now = System.currentTimeMillis();
-        recentlyUnloadedBrokers.put("broker1", now);
+        recentlyUnloadedBrokers.put("broker1:8080", now);
         res = transferShedder.findBundlesForUnloading(ctx, Map.of(), recentlyUnloadedBrokers);
 
         assertTrue(res.isEmpty());
@@ -604,9 +605,9 @@ public class TransferShedderTest {
         recentlyUnloadedBundles.put(bundleD2, now);
         var res = transferShedder.findBundlesForUnloading(ctx, recentlyUnloadedBundles, Map.of());
         var expected = new HashSet<UnloadDecision>();
-        expected.add(new UnloadDecision(new Unload("broker3",
+        expected.add(new UnloadDecision(new Unload("broker3:8080",
                 "my-tenant/my-namespaceC/0x00000000_0x0FFFFFFF",
-                Optional.of("broker1")),
+                Optional.of("broker1:8080")),
                 Success, Overloaded));
         assertEquals(res, expected);
         assertEquals(counter.getLoadAvg(), setupLoadAvg);
@@ -639,13 +640,13 @@ public class TransferShedderTest {
                 isolationPoliciesHelper, antiAffinityGroupPolicyHelper));
 
         setIsolationPolicies(allocationPoliciesSpy, "my-tenant/my-namespaceE",
-                Set.of("broker5"), Set.of(), Set.of(), 1);
+                Set.of("broker5:8080"), Set.of(), Set.of(), 1);
         var ctx = setupContext();
         ctx.brokerConfiguration().setLoadBalancerSheddingBundlesWithPoliciesEnabled(true);
         doReturn(ctx.brokerConfiguration()).when(pulsar).getConfiguration();
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
         var expected = new HashSet<UnloadDecision>();
-        expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker1")),
+        expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker1:8080")),
                 Success, Overloaded));
         assertEquals(res, expected);
         assertEquals(counter.getLoadAvg(), setupLoadAvg);
@@ -656,7 +657,7 @@ public class TransferShedderTest {
         res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
 
         expected = new HashSet<>();
-        expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.empty()),
+        expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.empty()),
                 Success, Overloaded));
         assertEquals(res, expected);
         assertEquals(counter.getLoadAvg(), setupLoadAvg);
@@ -777,7 +778,7 @@ public class TransferShedderTest {
         }).when(antiAffinityGroupPolicyHelper).filterAsync(any(), any());
         var res2 = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
         var expected2 = new HashSet<>();
-        expected2.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")),
+        expected2.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
                 Success, Overloaded));
         assertEquals(res2, expected2);
         assertEquals(counter.getLoadAvg(), setupLoadAvg);
@@ -853,22 +854,22 @@ public class TransferShedderTest {
         var ctx = getContext();
         BrokerRegistry brokerRegistry = mock(BrokerRegistry.class);
         doReturn(CompletableFuture.completedFuture(Map.of(
-                "broker1", mock(BrokerLookupData.class),
-                "broker2", mock(BrokerLookupData.class),
-                "broker3", mock(BrokerLookupData.class)
+                "broker1:8080", mock(BrokerLookupData.class),
+                "broker2:8080", mock(BrokerLookupData.class),
+                "broker3:8080", mock(BrokerLookupData.class)
         ))).when(brokerRegistry).getAvailableBrokerLookupDataAsync();
         doReturn(brokerRegistry).when(ctx).brokerRegistry();
         ctx.brokerConfiguration().setLoadBalancerDebugModeEnabled(true);
         var brokerLoadDataStore = ctx.brokerLoadDataStore();
-        brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx, 10, "broker1"));
-        brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx, 20, "broker2"));
-        brokerLoadDataStore.pushAsync("broker3", getCpuLoad(ctx, 30, "broker3"));
+        brokerLoadDataStore.pushAsync("broker1:8080", getCpuLoad(ctx, 10, "broker1:8080"));
+        brokerLoadDataStore.pushAsync("broker2:8080", getCpuLoad(ctx, 20, "broker2:8080"));
+        brokerLoadDataStore.pushAsync("broker3:8080", getCpuLoad(ctx, 30, "broker3:8080"));
 
         var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
 
-        topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("my-tenant/my-namespaceA", 30, 30));
-        topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("my-tenant/my-namespaceB", 40, 40));
-        topBundlesLoadDataStore.pushAsync("broker3", getTopBundlesLoad("my-tenant/my-namespaceC", 50, 50));
+        topBundlesLoadDataStore.pushAsync("broker1:8080", getTopBundlesLoad("my-tenant/my-namespaceA", 30, 30));
+        topBundlesLoadDataStore.pushAsync("broker2:8080", getTopBundlesLoad("my-tenant/my-namespaceB", 40, 40));
+        topBundlesLoadDataStore.pushAsync("broker3:8080", getTopBundlesLoad("my-tenant/my-namespaceC", 50, 50));
 
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
 
@@ -884,11 +885,11 @@ public class TransferShedderTest {
         TransferShedder transferShedder = new TransferShedder(counter);
         var ctx = setupContext();
         var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
-        topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("my-tenant/my-namespaceA", 1));
-        topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("my-tenant/my-namespaceB", 2));
-        topBundlesLoadDataStore.pushAsync("broker3", getTopBundlesLoad("my-tenant/my-namespaceC", 6));
-        topBundlesLoadDataStore.pushAsync("broker4", getTopBundlesLoad("my-tenant/my-namespaceD", 10));
-        topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("my-tenant/my-namespaceE", 70));
+        topBundlesLoadDataStore.pushAsync("broker1:8080", getTopBundlesLoad("my-tenant/my-namespaceA", 1));
+        topBundlesLoadDataStore.pushAsync("broker2:8080", getTopBundlesLoad("my-tenant/my-namespaceB", 2));
+        topBundlesLoadDataStore.pushAsync("broker3:8080", getTopBundlesLoad("my-tenant/my-namespaceC", 6));
+        topBundlesLoadDataStore.pushAsync("broker4:8080", getTopBundlesLoad("my-tenant/my-namespaceD", 10));
+        topBundlesLoadDataStore.pushAsync("broker5:8080", getTopBundlesLoad("my-tenant/my-namespaceE", 70));
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
 
         assertTrue(res.isEmpty());
@@ -903,14 +904,14 @@ public class TransferShedderTest {
         TransferShedder transferShedder = new TransferShedder(counter);
         var ctx = setupContext();
         var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
-        topBundlesLoadDataStore.pushAsync("broker4", getTopBundlesLoad("my-tenant/my-namespaceD", 1000000000, 1000000000));
-        topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("my-tenant/my-namespaceE", 1000000000, 1000000000));
+        topBundlesLoadDataStore.pushAsync("broker4:8080", getTopBundlesLoad("my-tenant/my-namespaceD", 1000000000, 1000000000));
+        topBundlesLoadDataStore.pushAsync("broker5:8080", getTopBundlesLoad("my-tenant/my-namespaceE", 1000000000, 1000000000));
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
 
         var expected = new HashSet<UnloadDecision>();
-        expected.add(new UnloadDecision(new Unload("broker3",
+        expected.add(new UnloadDecision(new Unload("broker3:8080",
                 "my-tenant/my-namespaceC/0x00000000_0x0FFFFFFF",
-                Optional.of("broker1")),
+                Optional.of("broker1:8080")),
                 Success, Overloaded));
         assertEquals(res, expected);
         assertEquals(counter.getLoadAvg(), setupLoadAvg);
@@ -924,12 +925,12 @@ public class TransferShedderTest {
         TransferShedder transferShedder = new TransferShedder(counter);
         var ctx = setupContext();
         var brokerLoadDataStore = ctx.brokerLoadDataStore();
-        brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx,  55, "broker4"));
-        brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx,  65, "broker5"));
+        brokerLoadDataStore.pushAsync("broker4:8080", getCpuLoad(ctx,  55, "broker4:8080"));
+        brokerLoadDataStore.pushAsync("broker5:8080", getCpuLoad(ctx,  65, "broker5:8080"));
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
 
         var expected = new HashSet<UnloadDecision>();
-        expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")),
+        expected.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
                 Success, Overloaded));
         assertEquals(res, expected);
         assertEquals(counter.getLoadAvg(), 0.26400000000000007);
@@ -945,43 +946,43 @@ public class TransferShedderTest {
         var brokerRegistry = mock(BrokerRegistry.class);
         doReturn(brokerRegistry).when(ctx).brokerRegistry();
         doReturn(CompletableFuture.completedFuture(Map.of(
-                "broker1", mock(BrokerLookupData.class),
-                "broker2", mock(BrokerLookupData.class)
+                "broker1:8080", mock(BrokerLookupData.class),
+                "broker2:8080", mock(BrokerLookupData.class)
         ))).when(brokerRegistry).getAvailableBrokerLookupDataAsync();
 
         var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
-        topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("my-tenant/my-namespaceA", 1000000, 2000000, 3000000, 4000000, 5000000));
-        topBundlesLoadDataStore.pushAsync("broker2",
+        topBundlesLoadDataStore.pushAsync("broker1:8080", getTopBundlesLoad("my-tenant/my-namespaceA", 1000000, 2000000, 3000000, 4000000, 5000000));
+        topBundlesLoadDataStore.pushAsync("broker2:8080",
                 getTopBundlesLoad("my-tenant/my-namespaceB", 100000000, 180000000, 220000000, 250000000, 250000000));
 
 
         var brokerLoadDataStore = ctx.brokerLoadDataStore();
-        brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx, 10, "broker1"));
-        brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx, 1000, "broker2"));
+        brokerLoadDataStore.pushAsync("broker1:8080", getCpuLoad(ctx, 10, "broker1:8080"));
+        brokerLoadDataStore.pushAsync("broker2:8080", getCpuLoad(ctx, 1000, "broker2:8080"));
 
 
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
         var expected = new HashSet<UnloadDecision>();
         expected.add(new UnloadDecision(
-                new Unload("broker2", "my-tenant/my-namespaceB/0x00000000_0x1FFFFFFF", Optional.of("broker1")),
+                new Unload("broker2:8080", "my-tenant/my-namespaceB/0x00000000_0x1FFFFFFF", Optional.of("broker1:8080")),
                 Success, Overloaded));
         expected.add(new UnloadDecision(
-                new Unload("broker2", "my-tenant/my-namespaceB/0x1FFFFFFF_0x2FFFFFFF", Optional.of("broker1")),
+                new Unload("broker2:8080", "my-tenant/my-namespaceB/0x1FFFFFFF_0x2FFFFFFF", Optional.of("broker1:8080")),
                 Success, Overloaded));
         expected.add(new UnloadDecision(
-                new Unload("broker2", "my-tenant/my-namespaceB/0x2FFFFFFF_0x3FFFFFFF", Optional.of("broker1")),
+                new Unload("broker2:8080", "my-tenant/my-namespaceB/0x2FFFFFFF_0x3FFFFFFF", Optional.of("broker1:8080")),
                 Success, Overloaded));
         expected.add(new UnloadDecision(
-                new Unload("broker1", "my-tenant/my-namespaceA/0x00000000_0x1FFFFFFF", Optional.of("broker2")),
+                new Unload("broker1:8080", "my-tenant/my-namespaceA/0x00000000_0x1FFFFFFF", Optional.of("broker2:8080")),
                 Success, Overloaded));
         expected.add(new UnloadDecision(
-                new Unload("broker1", "my-tenant/my-namespaceA/0x1FFFFFFF_0x2FFFFFFF", Optional.of("broker2")),
+                new Unload("broker1:8080", "my-tenant/my-namespaceA/0x1FFFFFFF_0x2FFFFFFF", Optional.of("broker2:8080")),
                 Success, Overloaded));
         expected.add(new UnloadDecision(
-                new Unload("broker1","my-tenant/my-namespaceA/0x2FFFFFFF_0x3FFFFFFF", Optional.of("broker2")),
+                new Unload("broker1:8080","my-tenant/my-namespaceA/0x2FFFFFFF_0x3FFFFFFF", Optional.of("broker2:8080")),
                 Success, Overloaded));
         expected.add(new UnloadDecision(
-                new Unload("broker1","my-tenant/my-namespaceA/0x3FFFFFFF_0x4FFFFFFF", Optional.of("broker2")),
+                new Unload("broker1:8080","my-tenant/my-namespaceA/0x3FFFFFFF_0x4FFFFFFF", Optional.of("broker2:8080")),
                 Success, Overloaded));
         assertEquals(counter.getLoadAvg(), 5.05);
         assertEquals(counter.getLoadStd(), 4.95);
@@ -1001,20 +1002,20 @@ public class TransferShedderTest {
         var brokerRegistry = mock(BrokerRegistry.class);
         doReturn(brokerRegistry).when(ctx).brokerRegistry();
         doReturn(CompletableFuture.completedFuture(Map.of(
-                "broker1", mock(BrokerLookupData.class),
-                "broker2", mock(BrokerLookupData.class)
+                "broker1:8080", mock(BrokerLookupData.class),
+                "broker2:8080", mock(BrokerLookupData.class)
         ))).when(brokerRegistry).getAvailableBrokerLookupDataAsync();
 
         var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
-        topBundlesLoadDataStore.pushAsync("broker1",
+        topBundlesLoadDataStore.pushAsync("broker1:8080",
                 getTopBundlesLoad("my-tenant/my-namespaceA", 1, 500000000));
-        topBundlesLoadDataStore.pushAsync("broker2",
+        topBundlesLoadDataStore.pushAsync("broker2:8080",
                 getTopBundlesLoad("my-tenant/my-namespaceB", 500000000, 500000000));
 
 
         var brokerLoadDataStore = ctx.brokerLoadDataStore();
-        brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx, 50, "broker1"));
-        brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx, 100, "broker2"));
+        brokerLoadDataStore.pushAsync("broker1:8080", getCpuLoad(ctx, 50, "broker1:8080"));
+        brokerLoadDataStore.pushAsync("broker2:8080", getCpuLoad(ctx, 100, "broker2:8080"));
 
 
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
@@ -1032,24 +1033,24 @@ public class TransferShedderTest {
         var brokerRegistry = mock(BrokerRegistry.class);
         doReturn(brokerRegistry).when(ctx).brokerRegistry();
         doReturn(CompletableFuture.completedFuture(Map.of(
-                "broker1", mock(BrokerLookupData.class),
-                "broker2", mock(BrokerLookupData.class)
+                "broker1:8080", mock(BrokerLookupData.class),
+                "broker2:8080", mock(BrokerLookupData.class)
         ))).when(brokerRegistry).getAvailableBrokerLookupDataAsync();
 
         var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
-        topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("my-tenant/my-namespaceA", 1000000, 2000000, 3000000, 4000000, 5000000));
-        topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("my-tenant/my-namespaceB", 490000000, 510000000));
+        topBundlesLoadDataStore.pushAsync("broker1:8080", getTopBundlesLoad("my-tenant/my-namespaceA", 1000000, 2000000, 3000000, 4000000, 5000000));
+        topBundlesLoadDataStore.pushAsync("broker2:8080", getTopBundlesLoad("my-tenant/my-namespaceB", 490000000, 510000000));
 
 
         var brokerLoadDataStore = ctx.brokerLoadDataStore();
-        brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx, 10, "broker1"));
-        brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx, 1000, "broker2"));
+        brokerLoadDataStore.pushAsync("broker1:8080", getCpuLoad(ctx, 10, "broker1:8080"));
+        brokerLoadDataStore.pushAsync("broker2:8080", getCpuLoad(ctx, 1000, "broker2:8080"));
 
 
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
         var expected = new HashSet<UnloadDecision>();
         expected.add(new UnloadDecision(
-                new Unload("broker2", "my-tenant/my-namespaceB/0x00000000_0x0FFFFFFF", Optional.of("broker1")),
+                new Unload("broker2:8080", "my-tenant/my-namespaceB/0x00000000_0x0FFFFFFF", Optional.of("broker1:8080")),
                 Success, Overloaded));
         assertEquals(counter.getLoadAvg(), 5.05);
         assertEquals(counter.getLoadStd(), 4.95);
@@ -1070,30 +1071,30 @@ public class TransferShedderTest {
         var brokerRegistry = mock(BrokerRegistry.class);
         doReturn(brokerRegistry).when(ctx).brokerRegistry();
         doReturn(CompletableFuture.completedFuture(Map.of(
-                "broker1", mock(BrokerLookupData.class),
-                "broker2", mock(BrokerLookupData.class)
+                "broker1:8080", mock(BrokerLookupData.class),
+                "broker2:8080", mock(BrokerLookupData.class)
         ))).when(brokerRegistry).getAvailableBrokerLookupDataAsync();
 
         var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
-        topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("my-tenant/my-namespaceA", 2400000, 2400000));
-        topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("my-tenant/my-namespaceB", 5000000, 5000000));
+        topBundlesLoadDataStore.pushAsync("broker1:8080", getTopBundlesLoad("my-tenant/my-namespaceA", 2400000, 2400000));
+        topBundlesLoadDataStore.pushAsync("broker2:8080", getTopBundlesLoad("my-tenant/my-namespaceB", 5000000, 5000000));
 
 
         var brokerLoadDataStore = ctx.brokerLoadDataStore();
-        brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx, 48, "broker1"));
-        brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx, 100, "broker2"));
+        brokerLoadDataStore.pushAsync("broker1:8080", getCpuLoad(ctx, 48, "broker1:8080"));
+        brokerLoadDataStore.pushAsync("broker2:8080", getCpuLoad(ctx, 100, "broker2:8080"));
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
 
         var expected = new HashSet<UnloadDecision>();
         expected.add(new UnloadDecision(
-                new Unload("broker1",
-                        res.stream().filter(x -> x.getUnload().sourceBroker().equals("broker1")).findFirst().get()
-                        .getUnload().serviceUnit(), Optional.of("broker2")),
+                new Unload("broker1:8080",
+                        res.stream().filter(x -> x.getUnload().sourceBroker().equals("broker1:8080")).findFirst().get()
+                        .getUnload().serviceUnit(), Optional.of("broker2:8080")),
                 Success, Overloaded));
         expected.add(new UnloadDecision(
-                new Unload("broker2",
-                        res.stream().filter(x -> x.getUnload().sourceBroker().equals("broker2")).findFirst().get()
-                                .getUnload().serviceUnit(), Optional.of("broker1")),
+                new Unload("broker2:8080",
+                        res.stream().filter(x -> x.getUnload().sourceBroker().equals("broker2:8080")).findFirst().get()
+                                .getUnload().serviceUnit(), Optional.of("broker1:8080")),
                 Success, Overloaded));
         assertEquals(counter.getLoadAvg(), 0.74);
         assertEquals(counter.getLoadStd(), 0.26);
@@ -1111,18 +1112,18 @@ public class TransferShedderTest {
         var ctx = setupContext();
         var brokerLoadDataStore = ctx.brokerLoadDataStore();
 
-        var load = getCpuLoad(ctx,  4, "broker2");
+        var load = getCpuLoad(ctx,  4, "broker2:8080");
         FieldUtils.writeDeclaredField(load,"msgThroughputEMA", 0, true);
-        brokerLoadDataStore.pushAsync("broker2", load);
-        brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx,  55, "broker4"));
-        brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx,  65, "broker5"));
+        brokerLoadDataStore.pushAsync("broker2:8080", load);
+        brokerLoadDataStore.pushAsync("broker4:8080", getCpuLoad(ctx,  55, "broker4:8080"));
+        brokerLoadDataStore.pushAsync("broker5:8080", getCpuLoad(ctx,  65, "broker5:8080"));
 
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
 
         var expected = new HashSet<UnloadDecision>();
-        expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")),
+        expected.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
                 Success, Overloaded));
-        expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker2")),
+        expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
                 Success, Underloaded));
         assertEquals(res, expected);
         assertEquals(counter.getLoadAvg(), 0.26400000000000007);
@@ -1136,17 +1137,17 @@ public class TransferShedderTest {
         var ctx = setupContext();
         var brokerLoadDataStore = ctx.brokerLoadDataStore();
 
-        var load = getCpuLoad(ctx,  3 , "broker2");
-        brokerLoadDataStore.pushAsync("broker2", load);
-        brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx,  55, "broker4"));
-        brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx,  65, "broker5"));
+        var load = getCpuLoad(ctx,  3 , "broker2:8080");
+        brokerLoadDataStore.pushAsync("broker2:8080", load);
+        brokerLoadDataStore.pushAsync("broker4:8080", getCpuLoad(ctx,  55, "broker4:8080"));
+        brokerLoadDataStore.pushAsync("broker5:8080", getCpuLoad(ctx,  65, "broker5:8080"));
 
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
 
         var expected = new HashSet<UnloadDecision>();
-        expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")),
+        expected.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
                 Success, Overloaded));
-        expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker2")),
+        expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
                 Success, Underloaded));
         assertEquals(res, expected);
         assertEquals(counter.getLoadAvg(), 0.262);
@@ -1162,9 +1163,9 @@ public class TransferShedderTest {
                 .setLoadBalancerMaxNumberOfBrokerSheddingPerCycle(10);
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
         var expected = new HashSet<UnloadDecision>();
-        expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")),
+        expected.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
                 Success, Overloaded));
-        expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker2")),
+        expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
                 Success, Overloaded));
         assertEquals(res, expected);
         assertEquals(counter.getLoadAvg(), setupLoadAvg);
@@ -1188,9 +1189,9 @@ public class TransferShedderTest {
         }
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
         var expected = new HashSet<UnloadDecision>();
-        expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")),
+        expected.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
                 Success, Overloaded));
-        expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker2")),
+        expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
                 Success, Overloaded));
         assertEquals(res, expected);
         assertEquals(counter.getLoadAvg(), setupLoadAvg);
@@ -1203,13 +1204,13 @@ public class TransferShedderTest {
         TransferShedder transferShedder = new TransferShedder(counter);
         var ctx = setupContext();
         var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
-        topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("my-tenant/my-namespaceE", 2000000, 3000000));
+        topBundlesLoadDataStore.pushAsync("broker5:8080", getTopBundlesLoad("my-tenant/my-namespaceE", 2000000, 3000000));
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
 
         var expected = new HashSet<UnloadDecision>();
-        expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")),
+        expected.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
                 Success, Overloaded));
-        expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker2")),
+        expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
                 Success, Overloaded));
         assertEquals(res, expected);
         assertEquals(counter.getLoadAvg(), setupLoadAvg);
@@ -1223,14 +1224,14 @@ public class TransferShedderTest {
         var ctx = setupContext();
 
         var brokerLoadDataStore = ctx.brokerLoadDataStore();
-        brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx,  200, "broker4"));
-        brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx,  1000, "broker5"));
+        brokerLoadDataStore.pushAsync("broker4:8080", getCpuLoad(ctx,  200, "broker4:8080"));
+        brokerLoadDataStore.pushAsync("broker5:8080", getCpuLoad(ctx,  1000, "broker5:8080"));
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
 
         var expected = new HashSet<UnloadDecision>();
-        expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")),
+        expected.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
                 Success, Overloaded));
-        expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker2")),
+        expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
                 Success, Overloaded));
         assertEquals(res, expected);
         assertEquals(counter.getLoadAvg(), 2.4240000000000004);
@@ -1264,13 +1265,16 @@ public class TransferShedderTest {
         TransferShedder transferShedder = new TransferShedder(counter);
         var ctx = setupContextLoadSkewedOverload(100);
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
-        var expected = new HashSet<UnloadDecision>();
-        expected.add(new UnloadDecision(
-                new Unload("broker99", "my-tenant/my-namespace99/0x00000000_0x0FFFFFFF",
-                        Optional.of("broker52")), Success, Overloaded));
-        assertEquals(res, expected);
-        assertEquals(counter.getLoadAvg(), 0.019900000000000008);
-        assertEquals(counter.getLoadStd(), 0.09850375627355534);
+        Assertions.assertThat(res).isIn(
+                Set.of(new UnloadDecision(
+                        new Unload("broker99:8080", "my-tenant/my-namespace99/0x00000000_0x0FFFFFFF",
+                                Optional.of("broker52:8080")), Success, Overloaded)),
+                Set.of(new UnloadDecision(
+                        new Unload("broker99:8080", "my-tenant/my-namespace99/0x00000000_0x0FFFFFFF",
+                                Optional.of("broker83:8080")), Success, Overloaded))
+        );
+        assertEquals(counter.getLoadAvg(), 0.019900000000000008, 0.00001);
+        assertEquals(counter.getLoadStd(), 0.09850375627355534, 0.00001);
     }
 
     @Test
@@ -1281,11 +1285,11 @@ public class TransferShedderTest {
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
         var expected = new HashSet<UnloadDecision>();
         expected.add(new UnloadDecision(
-                new Unload("broker98", "my-tenant/my-namespace98/0x00000000_0x0FFFFFFF",
-                        Optional.of("broker99")), Success, Underloaded));
+                new Unload("broker98:8080", "my-tenant/my-namespace98/0x00000000_0x0FFFFFFF",
+                        Optional.of("broker99:8080")), Success, Underloaded));
         assertEquals(res, expected);
-        assertEquals(counter.getLoadAvg(), 0.9704000000000005);
-        assertEquals(counter.getLoadStd(), 0.09652895938523735);
+        assertEquals(counter.getLoadAvg(), 0.9704000000000005, 0.00001);
+        assertEquals(counter.getLoadStd(), 0.09652895938523735, 0.00001);
     }
 
     @Test
@@ -1301,13 +1305,13 @@ public class TransferShedderTest {
             double[] loads = new double[numBrokers];
             final Map<String, BrokerLookupData> availableBrokers = new HashMap<>();
             for (int i = 0; i < loads.length; i++) {
-                availableBrokers.put("broker" + i, mock(BrokerLookupData.class));
+                availableBrokers.put("broker" + i + ":8080", mock(BrokerLookupData.class));
             }
             stats.update(loadStore, availableBrokers, Map.of(), conf);
 
             var brokerLoadDataStore = ctx.brokerLoadDataStore();
             for (int i = 0; i < loads.length; i++) {
-                loads[i] = loadStore.get("broker" + i).get().getWeightedMaxEMA();
+                loads[i] = loadStore.get("broker" + i + ":8080").get().getWeightedMaxEMA();
             }
             int i = 0;
             int j = loads.length - 1;
@@ -1342,8 +1346,8 @@ public class TransferShedderTest {
         var conf = ctx.brokerConfiguration();
         final Map<String, BrokerLookupData> availableBrokers = new HashMap<>();
         for (int i = 0; i < loads.length; i++) {
-            availableBrokers.put("broker" + i, mock(BrokerLookupData.class));
-            loadStore.pushAsync("broker" + i, getCpuLoad(ctx,  loads[i], "broker" + i));
+            availableBrokers.put("broker" + i + ":8080", mock(BrokerLookupData.class));
+            loadStore.pushAsync("broker" + i + ":8080", getCpuLoad(ctx,  loads[i], "broker" + i + ":8080"));
         }
         stats.update(loadStore, availableBrokers, Map.of(), conf);
 
@@ -1361,8 +1365,8 @@ public class TransferShedderTest {
         var conf = ctx.brokerConfiguration();
         final Map<String, BrokerLookupData> availableBrokers = new HashMap<>();
         for (int i = 0; i < loads.length; i++) {
-            availableBrokers.put("broker" + i, mock(BrokerLookupData.class));
-            loadStore.pushAsync("broker" + i, getCpuLoad(ctx,  loads[i], "broker" + i));
+            availableBrokers.put("broker" + i + ":8080", mock(BrokerLookupData.class));
+            loadStore.pushAsync("broker" + i + ":8080", getCpuLoad(ctx,  loads[i], "broker" + i + ":8080"));
         }
         stats.update(loadStore, availableBrokers, Map.of(), conf);
         assertEquals(stats.avg(), 3.9449999999999994);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index d48a56491b8..b924a59bf7d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -23,7 +23,6 @@ import static org.apache.pulsar.broker.resources.LoadBalanceResources.BROKER_TIM
 import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -31,6 +30,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.BoundType;
 import com.google.common.collect.Range;
@@ -84,16 +84,17 @@ import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.PortManager;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
-import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
-import org.apache.pulsar.policies.data.loadbalancer.BundleData;
 import org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData;
 import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -120,12 +121,9 @@ public class ModularLoadManagerImplTest {
 
     private PulsarService pulsar3;
 
-    private String primaryHost;
+    private String primaryBrokerId;
 
-    private String primaryTlsHost;
-    private String secondaryHost;
-
-    private String secondaryTlsHost;
+    private String secondaryBrokerId;
 
     private NamespaceBundleFactory nsFactory;
 
@@ -183,8 +181,7 @@ public class ModularLoadManagerImplTest {
         pulsar1 = new PulsarService(config1);
         pulsar1.start();
 
-        primaryHost = String.format("%s:%d", "localhost", pulsar1.getListenPortHTTP().get());
-        primaryTlsHost = String.format("%s:%d", "localhost", pulsar1.getListenPortHTTPS().get());
+        primaryBrokerId = String.format("%s:%d", "localhost", pulsar1.getListenPortHTTPS().get());
         url1 = new URL(pulsar1.getWebServiceAddress());
         admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
 
@@ -218,8 +215,7 @@ public class ModularLoadManagerImplTest {
         config.setBrokerServicePortTls(Optional.of(0));
         pulsar3 = new PulsarService(config);
 
-        secondaryHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTP().get());
-        secondaryTlsHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTPS().get());
+        secondaryBrokerId = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTPS().get());
         url2 = new URL(pulsar2.getWebServiceAddress());
         admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();
 
@@ -239,7 +235,7 @@ public class ModularLoadManagerImplTest {
 
         pulsar2.close();
         pulsar1.close();
-        
+
         if (pulsar3.isRunning()) {
             pulsar3.close();
         }
@@ -270,7 +266,7 @@ public class ModularLoadManagerImplTest {
         for (int i = 0; i < 2; ++i) {
             final ServiceUnitId serviceUnit = makeBundle(Integer.toString(i));
             final String broker = primaryLoadManager.selectBrokerForAssignment(serviceUnit).get();
-            if (broker.equals(primaryHost)) {
+            if (broker.equals(primaryBrokerId)) {
                 foundFirst = true;
             } else {
                 foundSecond = true;
@@ -286,12 +282,12 @@ public class ModularLoadManagerImplTest {
         LoadData loadData = (LoadData) getField(primaryLoadManager, "loadData");
 
         // Make sure the second broker is not in the internal map.
-        Awaitility.await().untilAsserted(() -> assertFalse(loadData.getBrokerData().containsKey(secondaryHost)));
+        Awaitility.await().untilAsserted(() -> assertFalse(loadData.getBrokerData().containsKey(secondaryBrokerId)));
 
         // Try 5 more selections, ensure they all go to the first broker.
         for (int i = 2; i < 7; ++i) {
             final ServiceUnitId serviceUnit = makeBundle(Integer.toString(i));
-            assertEquals(primaryLoadManager.selectBrokerForAssignment(serviceUnit), primaryHost);
+            assertEquals(primaryLoadManager.selectBrokerForAssignment(serviceUnit), primaryBrokerId);
         }
     }
 
@@ -313,7 +309,7 @@ public class ModularLoadManagerImplTest {
         // one bundle.
         pulsar1.getLocalMetadataStore().getMetadataCache(BundleData.class).create(firstBundleDataPath, bundleData).join();
         for (final NamespaceBundle bundle : bundles) {
-            if (primaryLoadManager.selectBrokerForAssignment(bundle).equals(primaryHost)) {
+            if (primaryLoadManager.selectBrokerForAssignment(bundle).equals(primaryBrokerId)) {
                 ++numAssignedToPrimary;
             } else {
                 ++numAssignedToSecondary;
@@ -327,52 +323,52 @@ public class ModularLoadManagerImplTest {
     }
 
 
-    
+
     @Test
     public void testBrokerAffinity() throws Exception {
         // Start broker 3
         pulsar3.start();
-        
+
         final String tenant = "test";
         final String cluster = "test";
         String namespace = tenant + "/" + cluster + "/" + "test";
         String topic = "persistent://" + namespace + "/my-topic1";
-        admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
+        admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress()).build());
         admin1.tenants().createTenant(tenant,
                 new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
         admin1.namespaces().createNamespace(namespace, 16);
-        
+
         String topicLookup = admin1.lookups().lookupTopic(topic);
         String bundleRange = admin1.lookups().getBundleRange(topic);
-        
+
         String brokerServiceUrl = pulsar1.getBrokerServiceUrl();
-        String brokerUrl = pulsar1.getSafeWebServiceAddress();
+        String brokerId = pulsar1.getBrokerId();
         log.debug("initial broker service url - {}", topicLookup);
         Random rand=new Random();
-        
+
         if (topicLookup.equals(brokerServiceUrl)) {
             int x = rand.nextInt(2);
             if (x == 0) {
-                brokerUrl = pulsar2.getSafeWebServiceAddress();
+                brokerId = pulsar2.getBrokerId();
                 brokerServiceUrl = pulsar2.getBrokerServiceUrl();
             }
             else {
-                brokerUrl = pulsar3.getSafeWebServiceAddress();
+                brokerId = pulsar3.getBrokerId();
                 brokerServiceUrl = pulsar3.getBrokerServiceUrl();
             }
         }
-        brokerUrl = brokerUrl.replaceFirst("http[s]?://", "");
-        log.debug("destination broker service url - {}, broker url - {}", brokerServiceUrl, brokerUrl);
-        String leaderServiceUrl = admin1.brokers().getLeaderBroker().getServiceUrl();
-        log.debug("leader serviceUrl - {}, broker1 service url - {}", leaderServiceUrl, pulsar1.getSafeWebServiceAddress());
-        //Make a call to broker which is not a leader
-        if (!leaderServiceUrl.equals(pulsar1.getSafeWebServiceAddress())) {
-            admin1.namespaces().unloadNamespaceBundle(namespace, bundleRange, brokerUrl);
+        log.debug("destination broker service url - {}, broker url - {}", brokerServiceUrl, brokerId);
+        String leaderBrokerId = admin1.brokers().getLeaderBroker().getBrokerId();
+        log.debug("leader lookup address - {}, broker1 lookup address - {}", leaderBrokerId,
+                pulsar1.getBrokerId());
+        // Make a call to broker which is not a leader
+        if (!leaderBrokerId.equals(pulsar1.getBrokerId())) {
+            admin1.namespaces().unloadNamespaceBundle(namespace, bundleRange, brokerId);
         }
         else {
-            admin2.namespaces().unloadNamespaceBundle(namespace, bundleRange, brokerUrl);
+            admin2.namespaces().unloadNamespaceBundle(namespace, bundleRange, brokerId);
         }
-        
+
         sleep(2000);
         String topicLookupAfterUnload = admin1.lookups().lookupTopic(topic);
         log.debug("final broker service url - {}", topicLookupAfterUnload);
@@ -444,9 +440,9 @@ public class ModularLoadManagerImplTest {
         pulsar1.getConfiguration().setLoadBalancerEnabled(true);
         final LoadData loadData = (LoadData) getField(primaryLoadManagerSpy, "loadData");
         final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
-        final BrokerData brokerDataSpy1 = spy(brokerDataMap.get(primaryTlsHost));
+        final BrokerData brokerDataSpy1 = spy(brokerDataMap.get(primaryBrokerId));
         when(brokerDataSpy1.getLocalData()).thenReturn(localBrokerData);
-        brokerDataMap.put(primaryTlsHost, brokerDataSpy1);
+        brokerDataMap.put(primaryBrokerId, brokerDataSpy1);
         // Need to update all the bundle data for the shredder to see the spy.
         primaryLoadManagerSpy.handleDataNotification(new Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + "/broker:8080"));
 
@@ -464,7 +460,7 @@ public class ModularLoadManagerImplTest {
         verify(namespacesSpy1, Mockito.times(1))
                 .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
         assertEquals(bundleReference.get(), mockBundleName(2));
-        assertEquals(selectedBrokerRef.get().get(), secondaryTlsHost);
+        assertEquals(selectedBrokerRef.get().get(), secondaryBrokerId);
 
         primaryLoadManagerSpy.doLoadShedding();
         // Now less expensive bundle will be unloaded (normally other bundle would move off and nothing would be
@@ -472,13 +468,13 @@ public class ModularLoadManagerImplTest {
         verify(namespacesSpy1, Mockito.times(2))
                 .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
         assertEquals(bundleReference.get(), mockBundleName(1));
-        assertEquals(selectedBrokerRef.get().get(), secondaryTlsHost);
+        assertEquals(selectedBrokerRef.get().get(), secondaryBrokerId);
 
         primaryLoadManagerSpy.doLoadShedding();
         // Now both are in grace period: neither should be unloaded.
         verify(namespacesSpy1, Mockito.times(2))
                 .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
-        assertEquals(selectedBrokerRef.get().get(), secondaryTlsHost);
+        assertEquals(selectedBrokerRef.get().get(), secondaryBrokerId);
 
         // Test bundle transfer to same broker
 
@@ -487,13 +483,11 @@ public class ModularLoadManagerImplTest {
         verify(namespacesSpy1, Mockito.times(3))
                 .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
 
-        doReturn(Optional.of(primaryHost)).when(primaryLoadManagerSpy).selectBroker(any());
         loadData.getRecentlyUnloadedBundles().clear();
         primaryLoadManagerSpy.doLoadShedding();
         // The bundle shouldn't be unloaded because the broker is the same.
         verify(namespacesSpy1, Mockito.times(4))
                 .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
-
     }
 
     // Test that ModularLoadManagerImpl will determine that writing local data to ZooKeeper is necessary if certain
@@ -610,10 +604,12 @@ public class ModularLoadManagerImplTest {
         final String tenant = "my-property";
         final String cluster = "use";
         final String namespace = "my-ns";
-        final String broker1Address = pulsar1.getAdvertisedAddress() + "0";
-        final String broker2Address = pulsar2.getAdvertisedAddress() + "1";
-        final String sharedBroker = "broker3";
-        admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
+        String broker1Host = pulsar1.getAdvertisedAddress() + "0";
+        final String broker1Address = broker1Host + ":8080";
+        String broker2Host = pulsar2.getAdvertisedAddress() + "1";
+        final String broker2Address = broker2Host + ":8080";
+        final String sharedBroker = "broker3:8080";
+        admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress()).build());
         admin1.tenants().createTenant(tenant,
                 new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
         admin1.namespaces().createNamespace(tenant + "/" + cluster + "/" + namespace);
@@ -621,8 +617,8 @@ public class ModularLoadManagerImplTest {
         // set a new policy
         String newPolicyJsonTemplate = "{\"namespaces\":[\"%s/%s/%s.*\"],\"primary\":[\"%s\"],"
                 + "\"secondary\":[\"%s\"],\"auto_failover_policy\":{\"policy_type\":\"min_available\",\"parameters\":{\"min_limit\":%s,\"usage_threshold\":80}}}";
-        String newPolicyJson = String.format(newPolicyJsonTemplate, tenant, cluster, namespace, broker1Address,
-                broker2Address, 1);
+        String newPolicyJson = String.format(newPolicyJsonTemplate, tenant, cluster, namespace, broker1Host,
+                broker2Host, 1);
         String newPolicyName = "my-ns-isolation-policies";
         ObjectMapper jsonMapper = ObjectMapperFactory.create();
         NamespaceIsolationDataImpl nsPolicyData = jsonMapper.readValue(newPolicyJson.getBytes(),
@@ -634,12 +630,12 @@ public class ModularLoadManagerImplTest {
         ServiceUnitId serviceUnit = LoadBalancerTestingUtils.makeBundles(nsFactory, tenant, cluster, namespace, 1)[0];
         BrokerTopicLoadingPredicate brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
             @Override
-            public boolean isEnablePersistentTopics(String brokerUrl) {
+            public boolean isEnablePersistentTopics(String brokerId) {
                 return true;
             }
 
             @Override
-            public boolean isEnableNonPersistentTopics(String brokerUrl) {
+            public boolean isEnableNonPersistentTopics(String brokerId) {
                 return true;
             }
         };
@@ -671,8 +667,8 @@ public class ModularLoadManagerImplTest {
 
         // (2) now we will have isolation policy : primary=broker1, secondary=broker2, minLimit=2
 
-        newPolicyJson = String.format(newPolicyJsonTemplate, tenant, cluster, namespace, broker1Address,
-                broker2Address, 2);
+        newPolicyJson = String.format(newPolicyJsonTemplate, tenant, cluster, namespace, broker1Host,
+                broker2Host, 2);
         nsPolicyData = jsonMapper.readValue(newPolicyJson.getBytes(), NamespaceIsolationDataImpl.class);
         admin1.clusters().createNamespaceIsolationPolicy("use", newPolicyName, nsPolicyData);
 
@@ -709,10 +705,12 @@ public class ModularLoadManagerImplTest {
         final String tenant = "my-tenant";
         final String namespace = "my-tenant/use/my-ns";
         final String bundle = "0x00000000_0xffffffff";
-        final String brokerAddress = pulsar1.getAdvertisedAddress();
-        final String broker1Address = pulsar1.getAdvertisedAddress() + 1;
+        final String brokerHost = pulsar1.getAdvertisedAddress();
+        final String brokerAddress = brokerHost  + ":8080";
+        final String broker1Host = pulsar1.getAdvertisedAddress() + "1";
+        final String broker1Address = broker1Host + ":8080";
 
-        admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
+        admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress()).build());
         admin1.tenants().createTenant(tenant,
                 new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
         admin1.namespaces().createNamespace(namespace);
@@ -727,12 +725,13 @@ public class ModularLoadManagerImplTest {
         loadManager.updateAll();
 
         // test1: no isolation policy
-        assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace, bundle, primaryHost));
+        assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace, bundle, primaryBrokerId));
 
         // test2: as isolation policy, there are not another broker to load the bundle.
         String newPolicyJsonTemplate = "{\"namespaces\":[\"%s.*\"],\"primary\":[\"%s\"],"
                 + "\"secondary\":[\"%s\"],\"auto_failover_policy\":{\"policy_type\":\"min_available\",\"parameters\":{\"min_limit\":%s,\"usage_threshold\":80}}}";
-        String newPolicyJson = String.format(newPolicyJsonTemplate, namespace, broker1Address,broker1Address, 1);
+
+        String newPolicyJson = String.format(newPolicyJsonTemplate, namespace, broker1Host, broker1Host, 1);
         String newPolicyName = "my-ns-isolation-policies";
         ObjectMapper jsonMapper = ObjectMapperFactory.create();
         NamespaceIsolationDataImpl nsPolicyData = jsonMapper.readValue(newPolicyJson.getBytes(),
@@ -741,11 +740,11 @@ public class ModularLoadManagerImplTest {
         assertFalse(loadManager.shouldNamespacePoliciesUnload(namespace, bundle, broker1Address));
 
         // test3: as isolation policy, there are another can load the bundle.
-        String newPolicyJson1 = String.format(newPolicyJsonTemplate, namespace, brokerAddress,brokerAddress, 1);
+        String newPolicyJson1 = String.format(newPolicyJsonTemplate, namespace, brokerHost, brokerHost, 1);
         NamespaceIsolationDataImpl nsPolicyData1 = jsonMapper.readValue(newPolicyJson1.getBytes(),
                 NamespaceIsolationDataImpl.class);
         admin1.clusters().updateNamespaceIsolationPolicy(cluster, newPolicyName, nsPolicyData1);
-        assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace, bundle, primaryHost));
+        assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace, bundle, primaryBrokerId));
 
         producer.close();
     }
@@ -762,7 +761,7 @@ public class ModularLoadManagerImplTest {
         ServiceConfiguration config = new ServiceConfiguration();
         config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config.setClusterName("use");
-        config.setWebServicePort(Optional.of(0));
+        config.setWebServicePort(Optional.of(PortManager.nextLockedFreePort()));
         config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@@ -770,10 +769,12 @@ public class ModularLoadManagerImplTest {
         PulsarService pulsar = new PulsarService(config);
         // create znode using different zk-session
         final String brokerZnode = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getAdvertisedAddress() + ":"
-                + config.getWebServicePort();
-        pulsar1.getLocalMetadataStore().put(brokerZnode, new byte[0], Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
+                + config.getWebServicePort().get();
+        pulsar1.getLocalMetadataStore()
+                .put(brokerZnode, new byte[0], Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
         try {
             pulsar.start();
+            fail("should have failed");
         } catch (PulsarServerException e) {
             //Ok.
         }
@@ -812,7 +813,7 @@ public class ModularLoadManagerImplTest {
         final String tenant = "my-tenant";
         final String namespace = "my-ns";
         NamespaceName ns = isV1 ? NamespaceName.get(tenant, cluster, namespace) : NamespaceName.get(tenant, namespace);
-        admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
+        admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress()).build());
         admin1.tenants().createTenant(tenant,
                 new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
         admin1.namespaces().createNamespace(ns.toString(), 16);
@@ -861,7 +862,7 @@ public class ModularLoadManagerImplTest {
         final String topicName = tenant + "/" + namespace + "/" + "topic";
         int bundleNumbers = 8;
 
-        admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
+        admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress()).build());
         admin1.tenants().createTenant(tenant,
                 new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
         admin1.namespaces().createNamespace(tenant + "/" + namespace, bundleNumbers);
@@ -911,7 +912,6 @@ public class ModularLoadManagerImplTest {
 
         final Optional<ResourceUnit> leastLoaded = loadManagerWrapper.getLeastLoaded(bundleWillBeSplit);
         assertFalse(leastLoaded.isEmpty());
-        assertTrue(leastLoaded.get().getResourceId().startsWith("https"));
 
         String bundleDataPath = BUNDLE_DATA_BASE_PATH + "/" + tenant + "/" + namespace;
         CompletableFuture<List<String>> children = bundlesCache.getChildren(bundleDataPath);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index c22e49e5fea..a0313ef7436 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -353,14 +353,14 @@ public class NamespaceServiceTest extends BrokerTestBase {
     @Test
     public void testLoadReportDeserialize() throws Exception {
 
-        final String candidateBroker1 = "http://localhost:8000";
-        final String candidateBroker2 = "http://localhost:3000";
-        LoadReport lr = new LoadReport(null, null, candidateBroker1, null);
-        LocalBrokerData ld = new LocalBrokerData(null, null, candidateBroker2, null);
-        URI uri1 = new URI(candidateBroker1);
-        URI uri2 = new URI(candidateBroker2);
-        String path1 = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri1.getHost(), uri1.getPort());
-        String path2 = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri2.getHost(), uri2.getPort());
+        final String candidateBroker1 = "localhost:8000";
+        String broker1Url = "pulsar://localhost:6650";
+        final String candidateBroker2 = "localhost:3000";
+        String broker2Url = "pulsar://localhost:6660";
+        LoadReport lr = new LoadReport("http://" + candidateBroker1, null, broker1Url, null);
+        LocalBrokerData ld = new LocalBrokerData("http://" + candidateBroker2, null, broker2Url, null);
+        String path1 = String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, candidateBroker1);
+        String path2 = String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, candidateBroker2);
 
         pulsar.getLocalMetadataStore().put(path1,
                 ObjectMapperFactory.getMapper().writer().writeValueAsBytes(lr),
@@ -379,23 +379,23 @@ public class NamespaceServiceTest extends BrokerTestBase {
                 .getAndSet(new ModularLoadManagerWrapper(new ModularLoadManagerImpl()));
         oldLoadManager.stop();
         LookupResult result2 = pulsar.getNamespaceService().createLookupResult(candidateBroker2, false, null).get();
-        Assert.assertEquals(result1.getLookupData().getBrokerUrl(), candidateBroker1);
-        Assert.assertEquals(result2.getLookupData().getBrokerUrl(), candidateBroker2);
+        Assert.assertEquals(result1.getLookupData().getBrokerUrl(), broker1Url);
+        Assert.assertEquals(result2.getLookupData().getBrokerUrl(), broker2Url);
         System.out.println(result2);
     }
 
     @Test
     public void testCreateLookupResult() throws Exception {
 
-        final String candidateBroker = "pulsar://localhost:6650";
+        final String candidateBroker = "localhost:8080";
+        final String brokerUrl = "pulsar://localhost:6650";
         final String listenerUrl = "pulsar://localhost:7000";
         final String listenerUrlTls = "pulsar://localhost:8000";
         final String listener = "listenerName";
         Map<String, AdvertisedListener> advertisedListeners = new HashMap<>();
         advertisedListeners.put(listener, AdvertisedListener.builder().brokerServiceUrl(new URI(listenerUrl)).brokerServiceUrlTls(new URI(listenerUrlTls)).build());
-        LocalBrokerData ld = new LocalBrokerData(null, null, candidateBroker, null, advertisedListeners);
-        URI uri = new URI(candidateBroker);
-        String path = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri.getHost(), uri.getPort());
+        LocalBrokerData ld = new LocalBrokerData("http://" + candidateBroker, null, brokerUrl, null, advertisedListeners);
+        String path = String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, candidateBroker);
 
         pulsar.getLocalMetadataStore().put(path,
                 ObjectMapperFactory.getMapper().writer().writeValueAsBytes(ld),
@@ -405,7 +405,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
         LookupResult noListener = pulsar.getNamespaceService().createLookupResult(candidateBroker, false, null).get();
         LookupResult withListener = pulsar.getNamespaceService().createLookupResult(candidateBroker, false, listener).get();
 
-        Assert.assertEquals(noListener.getLookupData().getBrokerUrl(), candidateBroker);
+        Assert.assertEquals(noListener.getLookupData().getBrokerUrl(), brokerUrl);
         Assert.assertEquals(withListener.getLookupData().getBrokerUrl(), listenerUrl);
         Assert.assertEquals(withListener.getLookupData().getBrokerUrlTls(), listenerUrlTls);
         System.out.println(withListener);
@@ -687,7 +687,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
 
     @Test
     public void testHeartbeatNamespaceMatch() throws Exception {
-        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespace(pulsar.getLookupServiceAddress(), conf);
+        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespace(pulsar.getBrokerId(), conf);
         NamespaceBundle namespaceBundle = pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundle(namespaceName);
         assertTrue(NamespaceService.isSystemServiceNamespace(
                         NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
@@ -707,7 +707,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
         Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
         loadManagerField.setAccessible(true);
         doReturn(true).when(loadManager).isCentralized();
-        SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getSafeWebServiceAddress(), null);
+        SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getBrokerId(), null);
         Optional<ResourceUnit> res = Optional.of(resourceUnit);
         doReturn(res).when(loadManager).getLeastLoaded(any(ServiceUnitId.class));
         loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager));
@@ -860,10 +860,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
 
     public CompletableFuture<Void> registryBrokerDataChangeNotice() {
         CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-        String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
-                + (conf.getWebServicePort().isPresent() ? conf.getWebServicePort().get()
-                : conf.getWebServicePortTls().get());
-        String brokerDataPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
+        String brokerDataPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getBrokerId();
         pulsar.getLocalMetadataStore().registerListener(notice -> {
             if (brokerDataPath.equals(notice.getPath())){
                 if (!completableFuture.isDone()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index f456a133d99..3600850974c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1534,9 +1534,9 @@ public class BrokerServiceTest extends BrokerTestBase {
         assertTrue(brokerService.isSystemTopic(TRANSACTION_COORDINATOR_ASSIGN));
         assertTrue(brokerService.isSystemTopic(TRANSACTION_COORDINATOR_LOG));
         NamespaceName heartbeatNamespaceV1 = NamespaceService
-                .getHeartbeatNamespace(pulsar.getLookupServiceAddress(), pulsar.getConfig());
+                .getHeartbeatNamespace(pulsar.getBrokerId(), pulsar.getConfig());
         NamespaceName heartbeatNamespaceV2 = NamespaceService
-                .getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(), pulsar.getConfig());
+                .getHeartbeatNamespaceV2(pulsar.getBrokerId(), pulsar.getConfig());
         assertTrue(brokerService.isSystemTopic("persistent://" + heartbeatNamespaceV1.toString() + "/healthcheck"));
         assertTrue(brokerService.isSystemTopic(heartbeatNamespaceV2.toString() + "/healthcheck"));
     }
@@ -1726,11 +1726,11 @@ public class BrokerServiceTest extends BrokerTestBase {
     }
 
     @Test
-    public void testGetLookupServiceAddress() throws Exception {
+    public void testGetBrokerId() throws Exception {
         cleanup();
-        setup();
         conf.setWebServicePortTls(Optional.of(8081));
-        assertEquals(pulsar.getLookupServiceAddress(), "localhost:8081");
+        setup();
+        assertEquals(pulsar.getBrokerId(), "localhost:8081");
         resetState();
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index 84c4670f2bb..1549ba8d81c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -599,11 +599,11 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
         super.baseSetup();
         // init topic
         NamespaceName heartbeatNamespaceV1 = NamespaceService
-                .getHeartbeatNamespace(pulsar.getLookupServiceAddress(), pulsar.getConfig());
+                .getHeartbeatNamespace(pulsar.getBrokerId(), pulsar.getConfig());
         final String healthCheckTopicV1 = "persistent://" + heartbeatNamespaceV1 + "/healthcheck";
 
         NamespaceName heartbeatNamespaceV2 = NamespaceService
-                .getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(), pulsar.getConfig());
+                .getHeartbeatNamespaceV2(pulsar.getBrokerId(), pulsar.getConfig());
         final String healthCheckTopicV2 = "persistent://" + heartbeatNamespaceV2 + "/healthcheck";
 
         admin.brokers().healthcheck(TopicVersion.V1);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 416d7ed0270..a2401ebe19a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -165,7 +165,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
 
     @Test
     public void testHealthCheckTopicNotOffload() throws Exception {
-        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
+        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(),
                 pulsar.getConfig());
         TopicName topicName = TopicName.get("persistent", namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
         PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
@@ -185,7 +185,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
     @Test
     public void testSystemNamespaceNotCreateChangeEventsTopic() throws Exception {
         admin.brokers().healthcheck(TopicVersion.V2);
-        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
+        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(),
                 pulsar.getConfig());
         TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
         Optional<Topic> optionalTopic = pulsar.getBrokerService()
@@ -203,7 +203,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
     @Test
     public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception {
         admin.brokers().healthcheck(TopicVersion.V2);
-        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
+        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(),
                 pulsar.getConfig());
         TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
         for (int partition = 0; partition < PARTITIONS; partition ++) {
@@ -218,7 +218,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
     @Test
     public void testHeartbeatTopicBeDeleted() throws Exception {
         admin.brokers().healthcheck(TopicVersion.V2);
-        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
+        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(),
                 pulsar.getConfig());
         TopicName heartbeatTopicName = TopicName.get("persistent", namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
 
@@ -230,11 +230,11 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
         topics = getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
         Assert.assertEquals(topics.size(), 0);
     }
-  
+
     @Test
     public void testHeartbeatNamespaceNotCreateTransactionInternalTopic() throws Exception {
         admin.brokers().healthcheck(TopicVersion.V2);
-        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
+        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(),
                 pulsar.getConfig());
         TopicName topicName = TopicName.get("persistent",
                 namespaceName, SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index 2611f0d8969..4b5af5e595c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
+import org.apache.pulsar.common.util.PortManager;
 import org.apache.pulsar.compaction.CompactionServiceFactory;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
@@ -157,6 +158,8 @@ public class PulsarTestContext implements AutoCloseable {
 
     private final boolean startable;
 
+    private final boolean preallocatePorts;
+
 
     public ManagedLedgerFactory getManagedLedgerFactory() {
         return managedLedgerClientFactory.getManagedLedgerFactory();
@@ -228,7 +231,9 @@ public class PulsarTestContext implements AutoCloseable {
         protected SpyConfig.Builder spyConfigBuilder = SpyConfig.builder(SpyConfig.SpyType.NONE);
         protected Consumer<PulsarService> pulsarServiceCustomizer;
         protected ServiceConfiguration svcConfig = initializeConfig();
-        protected Consumer<ServiceConfiguration> configOverrideCustomizer = this::defaultOverrideServiceConfiguration;
+        protected Consumer<ServiceConfiguration> configOverrideCustomizer;
+
+        protected boolean configOverrideCalled = false;
         protected Function<BrokerService, BrokerService> brokerServiceCustomizer = Function.identity();
         protected PulsarTestContext otherContextToClose;
 
@@ -354,6 +359,7 @@ public class PulsarTestContext implements AutoCloseable {
          */
         public Builder configOverride(Consumer<ServiceConfiguration> configOverrideCustomizer) {
             this.configOverrideCustomizer = configOverrideCustomizer;
+            this.configOverrideCalled = true;
             return this;
         }
 
@@ -538,6 +544,12 @@ public class PulsarTestContext implements AutoCloseable {
             if (super.config == null) {
                 config(svcConfig);
             }
+            handlePreallocatePorts(super.config);
+            if (configOverrideCustomizer != null || !configOverrideCalled) {
+                // call defaultOverrideServiceConfiguration if configOverrideCustomizer
+                // isn't explicitly set to null with `.configOverride(null)` call
+                defaultOverrideServiceConfiguration(super.config);
+            }
             if (configOverrideCustomizer != null) {
                 configOverrideCustomizer.accept(super.config);
             }
@@ -562,6 +574,37 @@ public class PulsarTestContext implements AutoCloseable {
             return super.build();
         }
 
+        protected void handlePreallocatePorts(ServiceConfiguration config) {
+            if (super.preallocatePorts) {
+                config.getBrokerServicePort().ifPresent(portNumber -> {
+                    if (portNumber == 0) {
+                        config.setBrokerServicePort(Optional.of(PortManager.nextLockedFreePort()));
+                    }
+                });
+                config.getBrokerServicePortTls().ifPresent(portNumber -> {
+                    if (portNumber == 0) {
+                        config.setBrokerServicePortTls(Optional.of(PortManager.nextLockedFreePort()));
+                    }
+                });
+                config.getWebServicePort().ifPresent(portNumber -> {
+                    if (portNumber == 0) {
+                        config.setWebServicePort(Optional.of(PortManager.nextLockedFreePort()));
+                    }
+                });
+                config.getWebServicePortTls().ifPresent(portNumber -> {
+                    if (portNumber == 0) {
+                        config.setWebServicePortTls(Optional.of(PortManager.nextLockedFreePort()));
+                    }
+                });
+                registerCloseable(() -> {
+                    config.getBrokerServicePort().ifPresent(PortManager::releaseLockedPort);
+                    config.getBrokerServicePortTls().ifPresent(PortManager::releaseLockedPort);
+                    config.getWebServicePort().ifPresent(PortManager::releaseLockedPort);
+                    config.getWebServicePortTls().ifPresent(PortManager::releaseLockedPort);
+                });
+            }
+        }
+
         private void initializeCommonPulsarServices(SpyConfig spyConfig) {
             if (super.bookKeeperClient == null && super.managedLedgerClientFactory == null) {
                 if (super.executor == null) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index a632608bf70..cb72c7d42cd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -172,7 +172,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
 
         // mock: return Broker2 as a Least-loaded broker when leader receives request [3]
         doReturn(true).when(loadManager1).isCentralized();
-        SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getSafeWebServiceAddress(), null);
+        SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getBrokerId(), null);
         doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
         doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
         loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));
@@ -305,7 +305,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
 
         // mock: return Broker2 as a Least-loaded broker when leader receives request
         doReturn(true).when(loadManager2).isCentralized();
-        SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getSafeWebServiceAddress(), null);
+        SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getBrokerId(), null);
         doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
         loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager2));
         /**** started broker-2 ****/
@@ -485,7 +485,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         // request [3]
         doReturn(true).when(loadManager1).isCentralized();
         doReturn(true).when(loadManager2).isCentralized();
-        SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddressTls(), null);
+        SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getBrokerId(), null);
         doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
         doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
 
@@ -579,7 +579,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2));
         // mock: return Broker1 as a Least-loaded broker when leader receives request [3]
         doReturn(true).when(loadManager1).isCentralized();
-        SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getSafeWebServiceAddress(), null);
+        SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getBrokerId(), null);
         doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
         doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
         loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));
@@ -694,7 +694,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
             loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2));
             // mock: return Broker1 as a Least-loaded broker when leader receives request [3]
             doReturn(true).when(loadManager1).isCentralized();
-            SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getSafeWebServiceAddress(), null);
+            SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getBrokerId(), null);
             Optional<ResourceUnit> res = Optional.of(resourceUnit);
             doReturn(res).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
             doReturn(res).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 6bc848a90d0..698ab15940b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -1778,9 +1778,9 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
     @SneakyThrows
     @Test
     public void testHealthCheckTopicNotCompacted() {
-        NamespaceName heartbeatNamespaceV1 = NamespaceService.getHeartbeatNamespace(pulsar.getLookupServiceAddress(), pulsar.getConfiguration());
+        NamespaceName heartbeatNamespaceV1 = NamespaceService.getHeartbeatNamespace(pulsar.getBrokerId(), pulsar.getConfiguration());
         String topicV1 = "persistent://" + heartbeatNamespaceV1.toString() + "/healthcheck";
-        NamespaceName heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(), pulsar.getConfiguration());
+        NamespaceName heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(), pulsar.getConfiguration());
         String topicV2 = heartbeatNamespaceV2.toString() + "/healthcheck";
         Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicV1).create();
         Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicV2).create();
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BrokerInfo.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BrokerInfo.java
index 8955fe7a0ac..19e9ff2d15a 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BrokerInfo.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BrokerInfo.java
@@ -25,9 +25,11 @@ import org.apache.pulsar.common.policies.data.impl.BrokerInfoImpl;
  */
 public interface BrokerInfo {
     String getServiceUrl();
+    String getBrokerId();
 
     interface Builder {
         Builder serviceUrl(String serviceUrl);
+        Builder brokerId(String brokerId);
         BrokerInfo build();
     }
 
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BrokerInfoImpl.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BrokerInfoImpl.java
index e4d0a68b50a..d77f693c7cd 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BrokerInfoImpl.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BrokerInfoImpl.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.common.policies.data.BrokerInfo;
 @NoArgsConstructor
 public final class BrokerInfoImpl implements BrokerInfo {
     private String serviceUrl;
+    private String brokerId;
 
     public static BrokerInfoImplBuilder builder() {
         return new BrokerInfoImplBuilder();
@@ -38,14 +39,20 @@ public final class BrokerInfoImpl implements BrokerInfo {
 
     public static class BrokerInfoImplBuilder implements BrokerInfo.Builder {
         private String serviceUrl;
+        private String brokerId;
 
         public BrokerInfoImplBuilder serviceUrl(String serviceUrl) {
             this.serviceUrl = serviceUrl;
             return this;
         }
 
+        public BrokerInfoImplBuilder brokerId(String brokerId) {
+            this.brokerId = brokerId;
+            return this;
+        }
+
         public BrokerInfoImpl build() {
-            return new BrokerInfoImpl(serviceUrl);
+            return new BrokerInfoImpl(serviceUrl, brokerId);
         }
     }
 }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
index 147c5396520..f997532b273 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
@@ -142,7 +142,7 @@ public class ProxyWithExtensibleLoadManagerTest extends MultiBrokerBaseTest {
         var srcBrokerUrl = admin.lookups().lookupTopic(topicName.toString());
         return getAllBrokers().stream().
                 filter(pulsarService -> !Objects.equals(srcBrokerUrl, pulsarService.getBrokerServiceUrl())).
-                map(PulsarService::getLookupServiceAddress).
+                map(PulsarService::getBrokerId).
                 findAny().orElseThrow(() -> new Exception("Could not determine destination broker lookup URL"));
     }