You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/18 01:26:07 UTC

[pulsar] branch master updated: [pulsar-broker] Make non-tls web/broker-service optional (#3501)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e9425c5  [pulsar-broker] Make non-tls web/broker-service optional (#3501)
e9425c5 is described below

commit e9425c52784e9def4b74c8b0fac6c52751f7f28b
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Jun 17 18:26:01 2019 -0700

    [pulsar-broker] Make non-tls web/broker-service optional (#3501)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  1 +
 .../PulsarConfigurationLoaderTest.java             |  4 +-
 .../broker/MessagingServiceShutdownHook.java       |  4 +-
 .../org/apache/pulsar/broker/PulsarService.java    | 30 ++++++++++++--
 .../apache/pulsar/broker/admin/AdminResource.java  |  2 +-
 .../broker/loadbalance/LeaderElectionService.java  |  8 ++--
 .../broker/loadbalance/ModularLoadManager.java     |  8 ++++
 .../pulsar/broker/loadbalance/NoopLoadManager.java |  4 +-
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 23 ++++++++---
 .../impl/ModularLoadManagerWrapper.java            | 12 +++++-
 .../loadbalance/impl/SimpleLoadManagerImpl.java    | 46 +++++++++++++---------
 .../pulsar/broker/namespace/NamespaceService.java  | 10 ++---
 .../pulsar/broker/namespace/OwnershipCache.java    |  6 +--
 .../pulsar/broker/web/PulsarWebResource.java       |  2 +-
 .../org/apache/pulsar/broker/web/WebService.java   |  2 +-
 .../apache/pulsar/broker/admin/NamespacesTest.java | 18 ++++-----
 .../pulsar/broker/admin/v1/V1_AdminApiTest2.java   |  2 +-
 .../AntiAffinityNamespaceGroupTest.java            |  2 +-
 .../broker/namespace/NamespaceServiceTest.java     |  4 +-
 .../broker/namespace/OwnershipCacheTest.java       |  4 +-
 .../broker/service/AdvertisedAddressTest.java      |  8 ++--
 .../pulsar/broker/service/PeerReplicatorTest.java  |  2 +-
 .../pulsar/broker/service/ReplicatorTestBase.java  | 12 +++---
 .../apache/pulsar/broker/web/WebServiceTest.java   |  2 +-
 .../pulsar/client/api/BrokerServiceLookupTest.java | 10 ++---
 .../pulsar/client/api/NonPersistentTopicTest.java  | 12 +++---
 .../pulsar/client/api/ServiceUrlProviderTest.java  | 24 +++++------
 .../websocket/proxy/ProxyPublishConsumeTest.java   |  2 +-
 .../proxy/ProxyPublishConsumeWithoutZKTest.java    |  2 +-
 29 files changed, 163 insertions(+), 103 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 840117d..a0490b6 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -108,6 +108,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
         category = CATEGORY_SERVER,
         doc = "The port for serving binary protobuf requests"
     )
+
     private Optional<Integer> brokerServicePort = Optional.of(6650);
     @FieldContext(
         category = CATEGORY_SERVER,
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
index 9e0bfa9..cd7439e 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
@@ -25,8 +25,6 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
-import io.netty.util.internal.PlatformDependent;
-
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -227,4 +225,4 @@ public class PulsarConfigurationLoaderTest {
         @FieldContext(minValue = 1, maxValue = 3)
         long inValidMax = 4;
     }
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
index 6cacaa5..e09a589 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
@@ -48,8 +48,8 @@ public class MessagingServiceShutdownHook extends Thread implements ShutdownServ
     @Override
     public void run() {
         if (service.getConfiguration() != null) {
-            LOG.info("messaging service shutdown hook started, lookup port="
-                    + service.getConfiguration().getWebServicePort().get() + ", broker url=" + service.getBrokerServiceUrl());
+            LOG.info("messaging service shutdown hook started, lookup webservice="
+                    + service.getSafeWebServiceAddress() + ", broker url=" + service.getSafeBrokerServiceUrl());
         }
 
         ExecutorService executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("shutdown-thread"));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f4c258a..2dddcd5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -344,6 +344,14 @@ public class PulsarService implements AutoCloseable {
                 throw new PulsarServerException("Cannot start the service once it was stopped");
             }
 
+            if (!config.getWebServicePort().isPresent() && !config.getWebServicePortTls().isPresent()) {
+                throw new IllegalArgumentException("webServicePort/webServicePortTls must be present");
+            }
+
+            if (!config.getBrokerServicePort().isPresent() && !config.getBrokerServicePortTls().isPresent()) {
+                throw new IllegalArgumentException("brokerServicePort/brokerServicePortTls must be present");
+            }
+            
             // Now we are ready to start services
             localZooKeeperConnectionProvider = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
                     config.getZookeeperServers(), config.getZooKeeperSessionTimeoutMillis());
@@ -445,9 +453,15 @@ public class PulsarService implements AutoCloseable {
             // start function worker service if necessary
             this.startWorkerService(brokerService.getAuthenticationService(), brokerService.getAuthorizationService());
 
-            LOG.info("messaging service is ready, bootstrap service on port={}, broker url={}, cluster={}, configs={}",
-                    config.getWebServicePort().get(), brokerServiceUrl, config.getClusterName(),
-                    ReflectionToStringBuilder.toString(config));
+            final String bootstrapMessage = "bootstrap service "
+                    + (config.getWebServicePort().isPresent() ? "port = " + config.getWebServicePort().get() : "")
+                    + (config.getWebServicePortTls().isPresent() ? "tls-port = " + config.getWebServicePortTls() : "")
+                    + (config.getBrokerServicePort().isPresent() ? "broker url= " + brokerServiceUrl : "")
+                    + (config.getBrokerServicePortTls().isPresent() ? "broker url= " + brokerServiceUrlTls : "");
+            LOG.info("messaging service is ready");
+            
+            LOG.info("messaging service is ready, {}, cluster={}, configs={}", bootstrapMessage,
+                    config.getClusterName(), ReflectionToStringBuilder.toString(config));
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
             throw new PulsarServerException(e);
@@ -925,6 +939,10 @@ public class PulsarService implements AutoCloseable {
         return advertisedAddress;
     }
 
+    public String getSafeWebServiceAddress() {
+        return webServiceAddress != null ? webServiceAddress : webServiceAddressTls;
+    }
+    
     public String getWebServiceAddress() {
         return webServiceAddress;
     }
@@ -933,6 +951,10 @@ public class PulsarService implements AutoCloseable {
         return webServiceAddressTls;
     }
 
+    public String getSafeBrokerServiceUrl() {
+        return brokerServiceUrl != null ? brokerServiceUrl : brokerServiceUrlTls;
+    }
+    
     public String getBrokerServiceUrl() {
         return brokerServiceUrl;
     }
@@ -994,7 +1016,7 @@ public class PulsarService implements AutoCloseable {
             // create cluster for function worker service
             try {
                 NamedEntity.checkName(cluster);
-                ClusterData clusterData = new ClusterData(this.getWebServiceAddress(), null /* serviceUrlTls */,
+                ClusterData clusterData = new ClusterData(this.getSafeWebServiceAddress(), null /* serviceUrlTls */,
                         brokerServiceUrl, null /* brokerServiceUrlTls */);
                 this.getGlobalZkCache().getZooKeeper().create(
                         AdminResource.path("clusters", cluster),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 2c87062..b4d4c36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -315,7 +315,7 @@ public abstract class AdminResource extends PulsarWebResource {
     protected void validateBrokerName(String broker) throws MalformedURLException {
         String brokerUrl = String.format("http://%s", broker);
         String brokerUrlTls = String.format("https://%s", broker);
-        if (!brokerUrl.equals(pulsar().getWebServiceAddress())
+        if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress())
                 && !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
             String[] parts = broker.split(":");
             checkArgument(parts.length == 2, "Invalid broker url %s", broker);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
index 712b234..8e96db4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
@@ -104,7 +104,7 @@ public class LeaderElectionService {
                                 public void run() {
                                     // If the node is deleted, attempt the re-election
                                     log.info("Broker [{}] is calling re-election from the thread",
-                                            pulsar.getWebServiceAddress());
+                                            pulsar.getSafeWebServiceAddress());
                                     elect();
                                 }
                             });
@@ -125,13 +125,13 @@ public class LeaderElectionService {
 
             // If broker comes here it is a follower. Do nothing, wait for the watch to trigger
             log.info("Broker [{}] is the follower now. Waiting for the watch to trigger...",
-                    pulsar.getWebServiceAddress());
+                    pulsar.getSafeWebServiceAddress());
 
         } catch (NoNodeException nne) {
             // There's no leader yet... try to become the leader
             try {
                 // Create the root node and add current broker's URL as its contents
-                LeaderBroker leaderBroker = new LeaderBroker(pulsar.getWebServiceAddress());
+                LeaderBroker leaderBroker = new LeaderBroker(pulsar.getSafeWebServiceAddress());
                 ZkUtils.createFullPathOptimistic(pulsar.getLocalZkCache().getZooKeeper(), ELECTION_ROOT,
                         jsonMapper.writeValueAsBytes(leaderBroker), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
 
@@ -141,7 +141,7 @@ public class LeaderElectionService {
 
                 // Notify the listener that this broker is now the leader so that it can collect usage and start load
                 // manager.
-                log.info("Broker [{}] is the leader now, notifying the listener...", pulsar.getWebServiceAddress());
+                log.info("Broker [{}] is the leader now, notifying the listener...", pulsar.getSafeWebServiceAddress());
                 leaderListener.brokerIsTheLeaderNow();
             } catch (NodeExistsException nee) {
                 // Re-elect the new leader
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
index 369bf5a..658aeba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
@@ -111,4 +111,12 @@ public interface ModularLoadManager {
      * @return
      */
     Set<String> getAvailableBrokers();
+
+    /**
+     * Fetch local-broker data from load-manager broker cache
+     * 
+     * @param broker load-balancer zk-path  
+     * @return
+     */
+    LocalBrokerData getBrokerLocalData(String broker);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
index 087eb28..d563177 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
@@ -59,8 +59,8 @@ public class NoopLoadManager implements LoadManager {
                 new PulsarResourceDescription());
         zkClient = pulsar.getZkClient();
 
-        localData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
-                pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
+        localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+                pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 997d9bb..4d71da6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -263,10 +263,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
         defaultStats.msgRateIn = DEFAULT_MESSAGE_RATE;
         defaultStats.msgRateOut = DEFAULT_MESSAGE_RATE;
 
-        lastData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
-                pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
-        localData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
-                pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
+        lastData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+                pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
+        localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+                pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
         localData.setBrokerVersionString(pulsar.getBrokerVersion());
         // configure broker-topic mode
         lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
@@ -783,7 +783,9 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
             // Register the brokers in zk list
             createZPathIfNotExists(zkClient, LoadManager.LOADBALANCE_BROKERS_ROOT);
 
-            String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + conf.getWebServicePort().get();
+            String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
+                    + (conf.getWebServicePort().isPresent() ? conf.getWebServicePort().get()
+                            : conf.getWebServicePortTls().get());
             brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
             final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + lookupServiceAddress;
             updateLocalBrokerData();
@@ -962,4 +964,15 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
             log.warn("Failed to get domain-list for cluster {}", e.getMessage());
         }
     }
+    
+    @Override
+    public LocalBrokerData getBrokerLocalData(String broker) {
+        String key = String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, broker);
+        try {
+            return brokerDataCache.get(key).orElse(null);
+        } catch (Exception e) {
+            log.warn("Failed to get local-broker data for {}",broker, e);
+            return null;
+        }
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index 82d99b6..fa4c4a4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.broker.loadbalance.ResourceUnit;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
 
@@ -68,13 +69,22 @@ public class ModularLoadManagerWrapper implements LoadManager {
     public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId serviceUnit) {
         Optional<String> leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit);
         if (leastLoadedBroker.isPresent()) {
-            return Optional.of(new SimpleResourceUnit(String.format("http://%s", leastLoadedBroker.get()),
+            return Optional.of(new SimpleResourceUnit(getBrokerWebServiceUrl(leastLoadedBroker.get()),
                     new PulsarResourceDescription()));
         } else {
             return Optional.empty();
         }
     }
 
+    private String getBrokerWebServiceUrl(String broker) {
+        LocalBrokerData localData = (loadManager).getBrokerLocalData(broker);
+        if (localData != null) {
+            return localData.getWebServiceUrl() != null ? localData.getWebServiceUrl()
+                    : localData.getWebServiceUrlTls();
+        }
+        return String.format("http://%s", broker);
+    }
+    
     @Override
     public List<Metrics> getLoadBalancingMetrics() {
         return Collections.emptyList();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 9f9b949..88bfd51 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -225,8 +225,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
             brokerHostUsage = new GenericBrokerHostUsageImpl(pulsar);
         }
         this.policies = new SimpleResourceAllocationPolicies(pulsar);
-        lastLoadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
-                pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
+        lastLoadReport = new LoadReport(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+                pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
         lastLoadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
         lastLoadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
 
@@ -283,8 +283,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
                     // ignore the exception, node might be present already
                 }
             }
-            String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + conf.getWebServicePort().get();
-            brokerZnodePath = LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
+            String lookupServiceAddress = getBrokerAddress();
+;            brokerZnodePath = LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
             LoadReport loadReport = null;
             try {
                 loadReport = generateLoadReport();
@@ -753,7 +753,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
             this.resourceUnitRankings = newResourceUnitRankings;
         } else {
             log.info("Leader broker[{}] No ResourceUnits to rank this run, Using Old Ranking",
-                    pulsar.getWebServiceAddress());
+                    pulsar.getSafeWebServiceAddress());
         }
     }
 
@@ -919,8 +919,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
             }
             brokerCandidateCache.clear();
             try {
-                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, availableBrokersCache,
-                        brokerTopicLoadingPredicate);
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
+                        availableBrokersCache, brokerTopicLoadingPredicate);
             } catch (Exception e) {
                 log.warn("Error when trying to apply policies: {}", e);
                 for (final Map.Entry<Long, Set<ResourceUnit>> entry : availableBrokers.entrySet()) {
@@ -1110,12 +1110,12 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
     private LoadReport generateLoadReportForcefully() throws Exception {
         synchronized (bundleGainsCache) {
             try {
-                LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
-                        pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
+                LoadReport loadReport = new LoadReport(pulsar.getSafeWebServiceAddress(),
+                        pulsar.getWebServiceAddressTls(), pulsar.getSafeBrokerServiceUrl(),
+                        pulsar.getBrokerServiceUrlTls());
                 loadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
                 loadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
-                loadReport.setName(String.format("%s:%s", pulsar.getAdvertisedAddress(),
-                        pulsar.getConfiguration().getWebServicePort().get()));
+                loadReport.setName(getBrokerAddress());
                 loadReport.setBrokerVersionString(pulsar.getBrokerVersion());
 
                 SystemResourceUsage systemResourceUsage = this.getSystemResourceUsage();
@@ -1183,6 +1183,13 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
         }
     }
 
+    private String getBrokerAddress() {
+        return String.format("%s:%s", pulsar.getAdvertisedAddress(),
+                pulsar.getConfiguration().getWebServicePort().isPresent()
+                        ? pulsar.getConfiguration().getWebServicePort().get()
+                        : pulsar.getConfiguration().getWebServicePortTls());
+    }
+
     @Override
     public void setLoadReportForceUpdateFlag() {
         this.forceLoadReportUpdate = true;
@@ -1236,9 +1243,11 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
 
                     // calculate percentage of change
                     double cpuChange = (newUsage.cpu.limit > 0)
-                            ? ((newUsage.cpu.usage - oldUsage.cpu.usage) * 100 / newUsage.cpu.limit) : 0;
+                            ? ((newUsage.cpu.usage - oldUsage.cpu.usage) * 100 / newUsage.cpu.limit)
+                            : 0;
                     double memChange = (newUsage.memory.limit > 0)
-                            ? ((newUsage.memory.usage - oldUsage.memory.usage) * 100 / newUsage.memory.limit) : 0;
+                            ? ((newUsage.memory.usage - oldUsage.memory.usage) * 100 / newUsage.memory.limit)
+                            : 0;
                     double directMemChange = (newUsage.directMemory.limit > 0)
                             ? ((newUsage.directMemory.usage - oldUsage.directMemory.usage) * 100
                                     / newUsage.directMemory.limit)
@@ -1258,10 +1267,9 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
 
                     if (resourceChange > pulsar.getConfiguration().getLoadBalancerReportUpdateThresholdPercentage()) {
                         needUpdate = true;
-                        log.info("LoadReport update triggered by change on resource usage, detal ({}).",
-                                String.format(
-                                        "cpu: %.1f%%, mem: %.1f%%, directMemory: %.1f%%, bandwidthIn: %.1f%%, bandwidthOut: %.1f%%)",
-                                        cpuChange, memChange, directMemChange, bandwidthInChange, bandwidthOutChange));
+                        log.info("LoadReport update triggered by change on resource usage, detal ({}).", String.format(
+                                "cpu: %.1f%%, mem: %.1f%%, directMemory: %.1f%%, bandwidthIn: %.1f%%, bandwidthOut: %.1f%%)",
+                                cpuChange, memChange, directMemChange, bandwidthInChange, bandwidthOutChange));
                     }
                 }
             }
@@ -1435,8 +1443,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
                 if (stats.topics <= 1) {
                     log.info("Unable to split hot namespace bundle {} since there is only one topic.", bundleName);
                 } else {
-                    NamespaceName namespaceName = NamespaceName.get(
-                            LoadManagerShared.getNamespaceNameFromBundleName(bundleName));
+                    NamespaceName namespaceName = NamespaceName
+                            .get(LoadManagerShared.getNamespaceNameFromBundleName(bundleName));
                     int numBundles = pulsar.getNamespaceService().getBundleCount(namespaceName);
                     if (numBundles >= maxBundleCount) {
                         log.info("Unable to split hot namespace bundle {} since the namespace has too many bundles.",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 2d9c6ec..5fbc20f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -148,7 +148,7 @@ public class NamespaceService {
         host = pulsar.getAdvertisedAddress();
         this.config = pulsar.getConfiguration();
         this.loadManager = pulsar.getLoadManager();
-        ServiceUnitZkUtils.initZK(pulsar.getLocalZkCache().getZooKeeper(), pulsar.getBrokerServiceUrl());
+        ServiceUnitZkUtils.initZK(pulsar.getLocalZkCache().getZooKeeper(), pulsar.getSafeBrokerServiceUrl());
         this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
         this.ownershipCache = new OwnershipCache(pulsar, bundleFactory);
         this.namespaceClients = new ConcurrentOpenHashMap<>();
@@ -259,7 +259,7 @@ public class NamespaceService {
      */
     private boolean registerNamespace(String namespace, boolean ensureOwned) throws PulsarServerException {
 
-        String myUrl = pulsar.getBrokerServiceUrl();
+        String myUrl = pulsar.getSafeBrokerServiceUrl();
 
         try {
             NamespaceName nsname = NamespaceName.get(namespace);
@@ -394,7 +394,7 @@ public class NamespaceService {
                 } else {
                     if (authoritative) {
                         // leader broker already assigned the current broker as owner
-                        candidateBroker = pulsar.getWebServiceAddress();
+                        candidateBroker = pulsar.getSafeWebServiceAddress();
                     } else {
                         // forward to leader broker to make assignment
                         candidateBroker = pulsar.getLeaderElectionService().getCurrentLeader().getServiceUrl();
@@ -410,7 +410,7 @@ public class NamespaceService {
         try {
             checkNotNull(candidateBroker);
 
-            if (pulsar.getWebServiceAddress().equals(candidateBroker)) {
+            if (pulsar.getSafeWebServiceAddress().equals(candidateBroker)) {
                 // invalidate namespace policies and try to load latest policies to avoid data-discrepancy if broker
                 // doesn't receive watch on policies changes
                 final String policyPath = AdminResource.path(POLICIES, bundle.getNamespaceObject().toString());
@@ -522,7 +522,7 @@ public class NamespaceService {
 
         String lookupAddress = leastLoadedBroker.get().getResourceId();
         if (LOG.isDebugEnabled()) {
-            LOG.debug("{} : redirecting to the least loaded broker, lookup address={}", pulsar.getWebServiceAddress(),
+            LOG.debug("{} : redirecting to the least loaded broker, lookup address={}", pulsar.getSafeWebServiceAddress(),
                     lookupAddress);
         }
         return Optional.of(lookupAddress);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index f7dce01..f48362a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -149,12 +149,12 @@ public class OwnershipCache {
      *            the local broker URL that will be set as owner for the <code>ServiceUnit</code>
      */
     public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory) {
-        this.ownerBrokerUrl = pulsar.getBrokerServiceUrl();
+        this.ownerBrokerUrl = pulsar.getSafeBrokerServiceUrl();
         this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
         this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
-                pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), false);
+                pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), false);
         this.selfOwnerInfoDisabled = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
-                pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), true);
+                pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), true);
         this.bundleFactory = bundleFactory;
         this.localZkCache = pulsar.getLocalZkCache();
         this.ownershipReadOnlyCache = pulsar.getLocalZkCacheService().ownerInfoCache();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 97eca09..a03849d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -749,7 +749,7 @@ public abstract class PulsarWebResource {
 
         String leaderAddress = pulsar.getLeaderElectionService().getCurrentLeader().getServiceUrl();
 
-        String myAddress = pulsar.getWebServiceAddress();
+        String myAddress = pulsar.getSafeWebServiceAddress();
 
         return myAddress.equals(leaderAddress); // If i am the leader, my decisions are
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 3ae55bc..61c7ccb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -187,7 +187,7 @@ public class WebService implements AutoCloseable {
 
             server.start();
 
-            log.info("Web Service started at {}", pulsar.getWebServiceAddress());
+            log.info("Web Service started at {}", pulsar.getSafeWebServiceAddress());
         } catch (Exception e) {
             throw new PulsarServerException(e);
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index f32d0b5..c615a09 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -618,7 +618,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
                 new byte[0], null, null);
 
         // setup ownership to localhost
-        URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());
+        URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
         doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, false, false, false);
         doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
         try {
@@ -669,7 +669,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void testDeleteNamespaceWithBundles() throws Exception {
-        URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());
+        URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
         String bundledNsLocal = "test-bundled-namespace-1";
         BundlesData bundleData = new BundlesData(Lists.newArrayList("0x00000000", "0x80000000", "0xffffffff"));
         createBundledTestNamespaces(this.testTenant, this.testLocalCluster, bundledNsLocal, bundleData);
@@ -791,7 +791,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testUnloadNamespaces() throws Exception {
         final NamespaceName testNs = this.testLocalNamespaces.get(1);
-        URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());
+        URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
         doReturn(Optional.of(localWebServiceUrl)).when(nsSvc)
                 .getWebServiceUrl(Mockito.argThat(new Matcher<NamespaceBundle>() {
 
@@ -851,7 +851,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void testSplitBundles() throws Exception {
-        URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());
+        URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
         String bundledNsLocal = "test-bundled-namespace-1";
         BundlesData bundleData = new BundlesData(Lists.newArrayList("0x00000000", "0xffffffff"));
         createBundledTestNamespaces(this.testTenant, this.testLocalCluster, bundledNsLocal, bundleData);
@@ -882,7 +882,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void testSplitBundleWithUnDividedRange() throws Exception {
-        URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());
+        URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
         String bundledNsLocal = "test-bundled-namespace-1";
         BundlesData bundleData = new BundlesData(
                 Lists.newArrayList("0x00000000", "0x08375b1a", "0x08375b1b", "0xffffffff"));
@@ -907,7 +907,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void testUnloadNamespaceWithBundles() throws Exception {
-        URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());
+        URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
         String bundledNsLocal = "test-bundled-namespace-1";
         BundlesData bundleData = new BundlesData(Lists.newArrayList("0x00000000", "0x80000000", "0xffffffff"));
         createBundledTestNamespaces(this.testTenant, this.testLocalCluster, bundledNsLocal, bundleData);
@@ -1018,7 +1018,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testValidateNamespaceOwnershipWithBundles() throws Exception {
         try {
-            URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());
+            URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
             String bundledNsLocal = "test-bundled-namespace-1";
             BundlesData bundleData = new BundlesData(Lists.newArrayList("0x00000000", "0xffffffff"));
             createBundledTestNamespaces(this.testTenant, this.testLocalCluster, bundledNsLocal, bundleData);
@@ -1040,7 +1040,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testRetention() throws Exception {
         try {
-            URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());
+            URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
             String bundledNsLocal = "test-bundled-namespace-1";
             BundlesData bundleData = new BundlesData(Lists.newArrayList("0x00000000", "0xffffffff"));
             createBundledTestNamespaces(this.testTenant, this.testLocalCluster, bundledNsLocal, bundleData);
@@ -1098,7 +1098,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void testValidateTopicOwnership() throws Exception {
-        URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());
+        URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
         String bundledNsLocal = "test-bundled-namespace-1";
         BundlesData bundleData = new BundlesData(Lists.newArrayList("0x00000000", "0xffffffff"));
         createBundledTestNamespaces(this.testTenant, this.testLocalCluster, bundledNsLocal, bundleData);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
index 39cbf32..176c851 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
@@ -651,7 +651,7 @@ public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest {
 
         final String cluster = pulsar.getConfiguration().getClusterName();
         admin.clusters().createCluster(cluster,
-                new ClusterData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls()));
+                new ClusterData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls()));
         // create
         FailureDomain domain = new FailureDomain();
         domain.setBrokers(Sets.newHashSet("b1", "b2", "b3"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index 1672bdf..cb9f98f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -511,7 +511,7 @@ public class AntiAffinityNamespaceGroupTest {
             admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup);
         }
 
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getWebServiceAddress()).build();
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getSafeWebServiceAddress()).build();
         Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://" + namespace + "0/my-topic1")
                 .create();
         ModularLoadManagerImpl loadManager = (ModularLoadManagerImpl) ((ModularLoadManagerWrapper) pulsar1
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 0399548..84f54a7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -151,7 +151,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
                         new Stat());
                 NamespaceEphemeralData node = ObjectMapperFactory.getThreadLocal().readValue(data,
                         NamespaceEphemeralData.class);
-                Assert.assertEquals(node.getNativeUrl(), this.pulsar.getBrokerServiceUrl());
+                Assert.assertEquals(node.getNativeUrl(), this.pulsar.getSafeBrokerServiceUrl());
             } catch (Exception e) {
                 fail("failed to setup ownership", e);
             }
@@ -421,7 +421,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
                         new Stat());
                 NamespaceEphemeralData node = ObjectMapperFactory.getThreadLocal().readValue(data,
                         NamespaceEphemeralData.class);
-                Assert.assertEquals(node.getNativeUrl(), this.pulsar.getBrokerServiceUrl());
+                Assert.assertEquals(node.getNativeUrl(), this.pulsar.getSafeBrokerServiceUrl());
             } catch (Exception e) {
                 fail("failed to setup ownership", e);
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index 65991ca..d616e21 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -96,8 +96,8 @@ public class OwnershipCacheTest {
         doReturn(Optional.ofNullable(new Integer(port))).when(config).getBrokerServicePort();
         doReturn(Optional.ofNullable(null)).when(config).getWebServicePort();
         doReturn(brokerService).when(pulsar).getBrokerService();
-        doReturn(webAddress(config)).when(pulsar).getWebServiceAddress();
-        doReturn(selfBrokerUrl).when(pulsar).getBrokerServiceUrl();
+        doReturn(webAddress(config)).when(pulsar).getSafeWebServiceAddress();
+        doReturn(selfBrokerUrl).when(pulsar).getSafeBrokerServiceUrl();
     }
 
     @AfterMethod
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
index 55d136a..988c65a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
@@ -71,13 +71,13 @@ public class AdvertisedAddressTest {
     @Test
     public void testAdvertisedAddress() throws Exception {
         Assert.assertEquals( pulsar.getAdvertisedAddress(), advertisedAddress );
-        Assert.assertEquals( pulsar.getBrokerServiceUrl(), String.format("pulsar://%s:%d", advertisedAddress, BROKER_SERVICE_PORT) );
-        Assert.assertEquals( pulsar.getWebServiceAddress(), String.format("http://%s:%d", advertisedAddress, BROKER_WEBSERVICE_PORT) );
+        Assert.assertEquals( pulsar.getSafeBrokerServiceUrl(), String.format("pulsar://%s:%d", advertisedAddress, BROKER_SERVICE_PORT) );
+        Assert.assertEquals( pulsar.getSafeWebServiceAddress(), String.format("http://%s:%d", advertisedAddress, BROKER_WEBSERVICE_PORT) );
         String brokerZkPath = String.format("/loadbalance/brokers/%s:%d", pulsar.getAdvertisedAddress(), BROKER_WEBSERVICE_PORT);
         String bkBrokerData = new String(bkEnsemble.getZkClient().getData(brokerZkPath, false, new Stat()), StandardCharsets.UTF_8);
         JsonObject jsonBkBrokerData = new Gson().fromJson(bkBrokerData, JsonObject.class);
-        Assert.assertEquals( jsonBkBrokerData.get("pulsarServiceUrl").getAsString(), pulsar.getBrokerServiceUrl() );
-        Assert.assertEquals( jsonBkBrokerData.get("webServiceUrl").getAsString(), pulsar.getWebServiceAddress() );
+        Assert.assertEquals( jsonBkBrokerData.get("pulsarServiceUrl").getAsString(), pulsar.getSafeBrokerServiceUrl() );
+        Assert.assertEquals( jsonBkBrokerData.get("webServiceUrl").getAsString(), pulsar.getSafeWebServiceAddress() );
     }
 
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
index 1581bb9..771ed50 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
@@ -83,7 +83,7 @@ public class PeerReplicatorTest extends ReplicatorTestBase {
     @Test(dataProvider = "lookupType", timeOut = 10000)
     public void testPeerClusterTopicLookup(String protocol) throws Exception {
 
-        // clean up peer-clusters
+     // clean up peer-clusters
         admin1.clusters().updatePeerClusterNames("r1", null);
         admin1.clusters().updatePeerClusterNames("r2", null);
         admin1.clusters().updatePeerClusterNames("r3", null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index e503c90..ca5ba68 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -208,11 +208,11 @@ public class ReplicatorTestBase {
 
         // Provision the global namespace
         admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), urlTls1.toString(),
-                pulsar1.getBrokerServiceUrl(), pulsar1.getBrokerServiceUrlTls()));
+                pulsar1.getSafeBrokerServiceUrl(), pulsar1.getBrokerServiceUrlTls()));
         admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), urlTls2.toString(),
-                pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()));
+                pulsar2.getSafeBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()));
         admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), urlTls3.toString(),
-                pulsar3.getBrokerServiceUrl(), pulsar3.getBrokerServiceUrlTls()));
+                pulsar3.getSafeBrokerServiceUrl(), pulsar3.getBrokerServiceUrlTls()));
 
         admin1.tenants().createTenant("pulsar",
                 new TenantInfo(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r2", "r3")));
@@ -222,9 +222,9 @@ public class ReplicatorTestBase {
         assertEquals(admin2.clusters().getCluster("r1").getServiceUrl(), url1.toString());
         assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString());
         assertEquals(admin2.clusters().getCluster("r3").getServiceUrl(), url3.toString());
-        assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(), pulsar1.getBrokerServiceUrl());
-        assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl());
-        assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getBrokerServiceUrl());
+        assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(), pulsar1.getSafeBrokerServiceUrl());
+        assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getSafeBrokerServiceUrl());
+        assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getSafeBrokerServiceUrl());
 
         // Also create V1 namespace for compatibility check
         admin1.clusters().createCluster("global", new ClusterData("http://global:8080", "https://global:8443"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index 22e0819..f0856b6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -287,7 +287,7 @@ public class WebServiceTest {
 
         try {
             pulsarAdmin.clusters().createCluster(config.getClusterName(),
-                    new ClusterData(pulsar.getWebServiceAddress()));
+                    new ClusterData(pulsar.getSafeWebServiceAddress()));
         } catch (ConflictException ce) {
             // This is OK.
         } finally {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index c9d92b6..49206ff 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -161,7 +161,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
 
         // mock: return Broker2 as a Least-loaded broker when leader receies request [3]
         doReturn(true).when(loadManager1).isCentralized();
-        SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getWebServiceAddress(), null);
+        SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getSafeWebServiceAddress(), null);
         doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
         doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
         loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));
@@ -252,7 +252,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
 
         // mock: return Broker2 as a Least-loaded broker when leader receies request
         doReturn(true).when(loadManager2).isCentralized();
-        SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getWebServiceAddress(), null);
+        SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getSafeWebServiceAddress(), null);
         doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
         loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager2));
         /**** started broker-2 ****/
@@ -826,7 +826,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2));
         // mock: return Broker1 as a Least-loaded broker when leader receies request [3]
         doReturn(true).when(loadManager1).isCentralized();
-        SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddress(), null);
+        SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getSafeWebServiceAddress(), null);
         doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
         doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
         loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));
@@ -934,7 +934,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
             loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2));
             // mock: return Broker1 as a Least-loaded broker when leader receies request [3]
             doReturn(true).when(loadManager1).isCentralized();
-            SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddress(), null);
+            SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getSafeWebServiceAddress(), null);
             Optional<ResourceUnit> res = Optional.of(resourceUnit);
             doReturn(res).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
             doReturn(res).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
@@ -1031,7 +1031,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         conf.setClientLibraryVersionCheckEnabled(true);
         startBroker();
 
-        URI brokerServiceUrl = new URI(pulsar.getWebServiceAddress());
+        URI brokerServiceUrl = new URI(pulsar.getSafeWebServiceAddress());
 
         URL url = brokerServiceUrl.toURL();
         String path = String.format("admin/%s/partitions", dest.getLookupName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 9d7392d..f70e6a1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -950,11 +950,11 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
             admin3 = PulsarAdmin.builder().serviceHttpUrl(url3.toString()).build();
 
             // Provision the global namespace
-            admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), null, pulsar1.getBrokerServiceUrl(),
+            admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), null, pulsar1.getSafeBrokerServiceUrl(),
                     pulsar1.getBrokerServiceUrlTls()));
-            admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), null, pulsar2.getBrokerServiceUrl(),
+            admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), null, pulsar2.getSafeBrokerServiceUrl(),
                     pulsar1.getBrokerServiceUrlTls()));
-            admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), null, pulsar3.getBrokerServiceUrl(),
+            admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), null, pulsar3.getSafeBrokerServiceUrl(),
                     pulsar1.getBrokerServiceUrlTls()));
 
             admin1.clusters().createCluster("global", new ClusterData("http://global:8080"));
@@ -967,9 +967,9 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
             assertEquals(admin2.clusters().getCluster("r1").getServiceUrl(), url1.toString());
             assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString());
             assertEquals(admin2.clusters().getCluster("r3").getServiceUrl(), url3.toString());
-            assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(), pulsar1.getBrokerServiceUrl());
-            assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl());
-            assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getBrokerServiceUrl());
+            assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(), pulsar1.getSafeBrokerServiceUrl());
+            assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getSafeBrokerServiceUrl());
+            assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getSafeBrokerServiceUrl());
             Thread.sleep(100);
             log.info("--- ReplicatorTestBase::setup completed ---");
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
index 167fc7a..01329d8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
@@ -54,7 +54,7 @@ public class ServiceUrlProviderTest extends ProducerConsumerBase {
     public void testCreateClientWithServiceUrlProvider() throws Exception {
 
         PulsarClient client = PulsarClient.builder()
-                .serviceUrlProvider(new TestServiceUrlProvider(pulsar.getBrokerServiceUrl()))
+                .serviceUrlProvider(new TestServiceUrlProvider(pulsar.getSafeBrokerServiceUrl()))
                 .statsInterval(1, TimeUnit.SECONDS)
                 .build();
         Assert.assertTrue(((PulsarClientImpl) client).getConfiguration().getServiceUrlProvider() instanceof TestServiceUrlProvider);
@@ -68,7 +68,7 @@ public class ServiceUrlProviderTest extends ProducerConsumerBase {
         for (int i = 0; i < 100; i++) {
             producer.send("Hello Pulsar[" + i + "]");
         }
-        client.updateServiceUrl(pulsar.getBrokerServiceUrl());
+        client.updateServiceUrl(pulsar.getSafeBrokerServiceUrl());
         for (int i = 100; i < 200; i++) {
             producer.send("Hello Pulsar[" + i + "]");
         }
@@ -87,7 +87,7 @@ public class ServiceUrlProviderTest extends ProducerConsumerBase {
     @Test
     public void testCreateClientWithAutoChangedServiceUrlProvider() throws Exception {
 
-        AutoChangedServiceUrlProvider serviceUrlProvider = new AutoChangedServiceUrlProvider(pulsar.getBrokerServiceUrl());
+        AutoChangedServiceUrlProvider serviceUrlProvider = new AutoChangedServiceUrlProvider(pulsar.getSafeBrokerServiceUrl());
 
         PulsarClient client = PulsarClient.builder()
                 .serviceUrlProvider(serviceUrlProvider)
@@ -109,26 +109,26 @@ public class ServiceUrlProviderTest extends ProducerConsumerBase {
         startBroker();
         PulsarService pulsarService2 = pulsar;
 
-        log.info("Pulsar1 = {}, Pulsar2 = {}", pulsarService1.getBrokerServiceUrl(), pulsarService2.getBrokerServiceUrl());
-        Assert.assertNotEquals(pulsarService1.getBrokerServiceUrl(), pulsarService2.getBrokerServiceUrl());
+        log.info("Pulsar1 = {}, Pulsar2 = {}", pulsarService1.getSafeBrokerServiceUrl(), pulsarService2.getSafeBrokerServiceUrl());
+        Assert.assertNotEquals(pulsarService1.getSafeBrokerServiceUrl(), pulsarService2.getSafeBrokerServiceUrl());
 
         log.info("Service url : producer = {}, consumer = {}",
             producer.getClient().getLookup().getServiceUrl(),
             consumer.getClient().getLookup().getServiceUrl());
 
-        Assert.assertEquals(producer.getClient().getLookup().getServiceUrl(), pulsarService1.getBrokerServiceUrl());
-        Assert.assertEquals(consumer.getClient().getLookup().getServiceUrl(), pulsarService1.getBrokerServiceUrl());
+        Assert.assertEquals(producer.getClient().getLookup().getServiceUrl(), pulsarService1.getSafeBrokerServiceUrl());
+        Assert.assertEquals(consumer.getClient().getLookup().getServiceUrl(), pulsarService1.getSafeBrokerServiceUrl());
 
         log.info("Changing service url from {} to {}",
-            pulsarService1.getBrokerServiceUrl(),
-            pulsarService2.getBrokerServiceUrl());
+            pulsarService1.getSafeBrokerServiceUrl(),
+            pulsarService2.getSafeBrokerServiceUrl());
 
-        serviceUrlProvider.onServiceUrlChanged(pulsarService2.getBrokerServiceUrl());
+        serviceUrlProvider.onServiceUrlChanged(pulsarService2.getSafeBrokerServiceUrl());
         log.info("Service url changed : producer = {}, consumer = {}",
             producer.getClient().getLookup().getServiceUrl(),
             consumer.getClient().getLookup().getServiceUrl());
-        Assert.assertEquals(producer.getClient().getLookup().getServiceUrl(), pulsarService2.getBrokerServiceUrl());
-        Assert.assertEquals(consumer.getClient().getLookup().getServiceUrl(), pulsarService2.getBrokerServiceUrl());
+        Assert.assertEquals(producer.getClient().getLookup().getServiceUrl(), pulsarService2.getSafeBrokerServiceUrl());
+        Assert.assertEquals(consumer.getClient().getLookup().getServiceUrl(), pulsarService2.getSafeBrokerServiceUrl());
         producer.close();
         consumer.close();
         client.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 9a26036..2d5c9f2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -407,7 +407,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
             }
 
             Client client = ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class));
-            final String baseUrl = pulsar.getWebServiceAddress()
+            final String baseUrl = pulsar.getSafeWebServiceAddress()
                     .replace(Integer.toString(pulsar.getConfiguration().getWebServicePort().get()), (Integer.toString(port)))
                     + "/admin/v2/proxy-stats/";
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
index debe0f0..f0263bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
@@ -59,7 +59,7 @@ public class ProxyPublishConsumeWithoutZKTest extends ProducerConsumerBase {
         WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
         config.setWebServicePort(Optional.of(port));
         config.setClusterName("test");
-        config.setServiceUrl(pulsar.getWebServiceAddress());
+        config.setServiceUrl(pulsar.getSafeWebServiceAddress());
         config.setServiceUrlTls(pulsar.getWebServiceAddressTls());
         service = spy(new WebSocketService(config));
         doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();