You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/18 01:26:07 UTC
[pulsar] branch master updated: [pulsar-broker] Make non-tls
web/broker-service optional (#3501)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e9425c5 [pulsar-broker] Make non-tls web/broker-service optional (#3501)
e9425c5 is described below
commit e9425c52784e9def4b74c8b0fac6c52751f7f28b
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Jun 17 18:26:01 2019 -0700
[pulsar-broker] Make non-tls web/broker-service optional (#3501)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 1 +
.../PulsarConfigurationLoaderTest.java | 4 +-
.../broker/MessagingServiceShutdownHook.java | 4 +-
.../org/apache/pulsar/broker/PulsarService.java | 30 ++++++++++++--
.../apache/pulsar/broker/admin/AdminResource.java | 2 +-
.../broker/loadbalance/LeaderElectionService.java | 8 ++--
.../broker/loadbalance/ModularLoadManager.java | 8 ++++
.../pulsar/broker/loadbalance/NoopLoadManager.java | 4 +-
.../loadbalance/impl/ModularLoadManagerImpl.java | 23 ++++++++---
.../impl/ModularLoadManagerWrapper.java | 12 +++++-
.../loadbalance/impl/SimpleLoadManagerImpl.java | 46 +++++++++++++---------
.../pulsar/broker/namespace/NamespaceService.java | 10 ++---
.../pulsar/broker/namespace/OwnershipCache.java | 6 +--
.../pulsar/broker/web/PulsarWebResource.java | 2 +-
.../org/apache/pulsar/broker/web/WebService.java | 2 +-
.../apache/pulsar/broker/admin/NamespacesTest.java | 18 ++++-----
.../pulsar/broker/admin/v1/V1_AdminApiTest2.java | 2 +-
.../AntiAffinityNamespaceGroupTest.java | 2 +-
.../broker/namespace/NamespaceServiceTest.java | 4 +-
.../broker/namespace/OwnershipCacheTest.java | 4 +-
.../broker/service/AdvertisedAddressTest.java | 8 ++--
.../pulsar/broker/service/PeerReplicatorTest.java | 2 +-
.../pulsar/broker/service/ReplicatorTestBase.java | 12 +++---
.../apache/pulsar/broker/web/WebServiceTest.java | 2 +-
.../pulsar/client/api/BrokerServiceLookupTest.java | 10 ++---
.../pulsar/client/api/NonPersistentTopicTest.java | 12 +++---
.../pulsar/client/api/ServiceUrlProviderTest.java | 24 +++++------
.../websocket/proxy/ProxyPublishConsumeTest.java | 2 +-
.../proxy/ProxyPublishConsumeWithoutZKTest.java | 2 +-
29 files changed, 163 insertions(+), 103 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 840117d..a0490b6 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -108,6 +108,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_SERVER,
doc = "The port for serving binary protobuf requests"
)
+
private Optional<Integer> brokerServicePort = Optional.of(6650);
@FieldContext(
category = CATEGORY_SERVER,
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
index 9e0bfa9..cd7439e 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
@@ -25,8 +25,6 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-import io.netty.util.internal.PlatformDependent;
-
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -227,4 +225,4 @@ public class PulsarConfigurationLoaderTest {
@FieldContext(minValue = 1, maxValue = 3)
long inValidMax = 4;
}
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
index 6cacaa5..e09a589 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
@@ -48,8 +48,8 @@ public class MessagingServiceShutdownHook extends Thread implements ShutdownServ
@Override
public void run() {
if (service.getConfiguration() != null) {
- LOG.info("messaging service shutdown hook started, lookup port="
- + service.getConfiguration().getWebServicePort().get() + ", broker url=" + service.getBrokerServiceUrl());
+ LOG.info("messaging service shutdown hook started, lookup webservice="
+ + service.getSafeWebServiceAddress() + ", broker url=" + service.getSafeBrokerServiceUrl());
}
ExecutorService executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("shutdown-thread"));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f4c258a..2dddcd5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -344,6 +344,14 @@ public class PulsarService implements AutoCloseable {
throw new PulsarServerException("Cannot start the service once it was stopped");
}
+ if (!config.getWebServicePort().isPresent() && !config.getWebServicePortTls().isPresent()) {
+ throw new IllegalArgumentException("webServicePort/webServicePortTls must be present");
+ }
+
+ if (!config.getBrokerServicePort().isPresent() && !config.getBrokerServicePortTls().isPresent()) {
+ throw new IllegalArgumentException("brokerServicePort/brokerServicePortTls must be present");
+ }
+
// Now we are ready to start services
localZooKeeperConnectionProvider = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
config.getZookeeperServers(), config.getZooKeeperSessionTimeoutMillis());
@@ -445,9 +453,15 @@ public class PulsarService implements AutoCloseable {
// start function worker service if necessary
this.startWorkerService(brokerService.getAuthenticationService(), brokerService.getAuthorizationService());
- LOG.info("messaging service is ready, bootstrap service on port={}, broker url={}, cluster={}, configs={}",
- config.getWebServicePort().get(), brokerServiceUrl, config.getClusterName(),
- ReflectionToStringBuilder.toString(config));
+ final String bootstrapMessage = "bootstrap service "
+ + (config.getWebServicePort().isPresent() ? "port = " + config.getWebServicePort().get() : "")
+ + (config.getWebServicePortTls().isPresent() ? "tls-port = " + config.getWebServicePortTls() : "")
+ + (config.getBrokerServicePort().isPresent() ? "broker url= " + brokerServiceUrl : "")
+ + (config.getBrokerServicePortTls().isPresent() ? "broker url= " + brokerServiceUrlTls : "");
+ LOG.info("messaging service is ready");
+
+ LOG.info("messaging service is ready, {}, cluster={}, configs={}", bootstrapMessage,
+ config.getClusterName(), ReflectionToStringBuilder.toString(config));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new PulsarServerException(e);
@@ -925,6 +939,10 @@ public class PulsarService implements AutoCloseable {
return advertisedAddress;
}
+ public String getSafeWebServiceAddress() {
+ return webServiceAddress != null ? webServiceAddress : webServiceAddressTls;
+ }
+
public String getWebServiceAddress() {
return webServiceAddress;
}
@@ -933,6 +951,10 @@ public class PulsarService implements AutoCloseable {
return webServiceAddressTls;
}
+ public String getSafeBrokerServiceUrl() {
+ return brokerServiceUrl != null ? brokerServiceUrl : brokerServiceUrlTls;
+ }
+
public String getBrokerServiceUrl() {
return brokerServiceUrl;
}
@@ -994,7 +1016,7 @@ public class PulsarService implements AutoCloseable {
// create cluster for function worker service
try {
NamedEntity.checkName(cluster);
- ClusterData clusterData = new ClusterData(this.getWebServiceAddress(), null /* serviceUrlTls */,
+ ClusterData clusterData = new ClusterData(this.getSafeWebServiceAddress(), null /* serviceUrlTls */,
brokerServiceUrl, null /* brokerServiceUrlTls */);
this.getGlobalZkCache().getZooKeeper().create(
AdminResource.path("clusters", cluster),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 2c87062..b4d4c36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -315,7 +315,7 @@ public abstract class AdminResource extends PulsarWebResource {
protected void validateBrokerName(String broker) throws MalformedURLException {
String brokerUrl = String.format("http://%s", broker);
String brokerUrlTls = String.format("https://%s", broker);
- if (!brokerUrl.equals(pulsar().getWebServiceAddress())
+ if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress())
&& !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
String[] parts = broker.split(":");
checkArgument(parts.length == 2, "Invalid broker url %s", broker);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
index 712b234..8e96db4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
@@ -104,7 +104,7 @@ public class LeaderElectionService {
public void run() {
// If the node is deleted, attempt the re-election
log.info("Broker [{}] is calling re-election from the thread",
- pulsar.getWebServiceAddress());
+ pulsar.getSafeWebServiceAddress());
elect();
}
});
@@ -125,13 +125,13 @@ public class LeaderElectionService {
// If broker comes here it is a follower. Do nothing, wait for the watch to trigger
log.info("Broker [{}] is the follower now. Waiting for the watch to trigger...",
- pulsar.getWebServiceAddress());
+ pulsar.getSafeWebServiceAddress());
} catch (NoNodeException nne) {
// There's no leader yet... try to become the leader
try {
// Create the root node and add current broker's URL as its contents
- LeaderBroker leaderBroker = new LeaderBroker(pulsar.getWebServiceAddress());
+ LeaderBroker leaderBroker = new LeaderBroker(pulsar.getSafeWebServiceAddress());
ZkUtils.createFullPathOptimistic(pulsar.getLocalZkCache().getZooKeeper(), ELECTION_ROOT,
jsonMapper.writeValueAsBytes(leaderBroker), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
@@ -141,7 +141,7 @@ public class LeaderElectionService {
// Notify the listener that this broker is now the leader so that it can collect usage and start load
// manager.
- log.info("Broker [{}] is the leader now, notifying the listener...", pulsar.getWebServiceAddress());
+ log.info("Broker [{}] is the leader now, notifying the listener...", pulsar.getSafeWebServiceAddress());
leaderListener.brokerIsTheLeaderNow();
} catch (NodeExistsException nee) {
// Re-elect the new leader
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
index 369bf5a..658aeba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
@@ -111,4 +111,12 @@ public interface ModularLoadManager {
* @return
*/
Set<String> getAvailableBrokers();
+
+ /**
+ * Fetch local-broker data from load-manager broker cache
+ *
+ * @param broker load-balancer zk-path
+ * @return
+ */
+ LocalBrokerData getBrokerLocalData(String broker);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
index 087eb28..d563177 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
@@ -59,8 +59,8 @@ public class NoopLoadManager implements LoadManager {
new PulsarResourceDescription());
zkClient = pulsar.getZkClient();
- localData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
- pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
+ localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+ pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 997d9bb..4d71da6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -263,10 +263,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
defaultStats.msgRateIn = DEFAULT_MESSAGE_RATE;
defaultStats.msgRateOut = DEFAULT_MESSAGE_RATE;
- lastData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
- pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
- localData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
- pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
+ lastData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+ pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
+ localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+ pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
localData.setBrokerVersionString(pulsar.getBrokerVersion());
// configure broker-topic mode
lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
@@ -783,7 +783,9 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// Register the brokers in zk list
createZPathIfNotExists(zkClient, LoadManager.LOADBALANCE_BROKERS_ROOT);
- String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + conf.getWebServicePort().get();
+ String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
+ + (conf.getWebServicePort().isPresent() ? conf.getWebServicePort().get()
+ : conf.getWebServicePortTls().get());
brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + lookupServiceAddress;
updateLocalBrokerData();
@@ -962,4 +964,15 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
log.warn("Failed to get domain-list for cluster {}", e.getMessage());
}
}
+
+ @Override
+ public LocalBrokerData getBrokerLocalData(String broker) {
+ String key = String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, broker);
+ try {
+ return brokerDataCache.get(key).orElse(null);
+ } catch (Exception e) {
+ log.warn("Failed to get local-broker data for {}",broker, e);
+ return null;
+ }
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index 82d99b6..fa4c4a4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
@@ -68,13 +69,22 @@ public class ModularLoadManagerWrapper implements LoadManager {
public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId serviceUnit) {
Optional<String> leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit);
if (leastLoadedBroker.isPresent()) {
- return Optional.of(new SimpleResourceUnit(String.format("http://%s", leastLoadedBroker.get()),
+ return Optional.of(new SimpleResourceUnit(getBrokerWebServiceUrl(leastLoadedBroker.get()),
new PulsarResourceDescription()));
} else {
return Optional.empty();
}
}
+ private String getBrokerWebServiceUrl(String broker) {
+ LocalBrokerData localData = (loadManager).getBrokerLocalData(broker);
+ if (localData != null) {
+ return localData.getWebServiceUrl() != null ? localData.getWebServiceUrl()
+ : localData.getWebServiceUrlTls();
+ }
+ return String.format("http://%s", broker);
+ }
+
@Override
public List<Metrics> getLoadBalancingMetrics() {
return Collections.emptyList();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 9f9b949..88bfd51 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -225,8 +225,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
brokerHostUsage = new GenericBrokerHostUsageImpl(pulsar);
}
this.policies = new SimpleResourceAllocationPolicies(pulsar);
- lastLoadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
- pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
+ lastLoadReport = new LoadReport(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+ pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
lastLoadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
lastLoadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
@@ -283,8 +283,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
// ignore the exception, node might be present already
}
}
- String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + conf.getWebServicePort().get();
- brokerZnodePath = LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
+ String lookupServiceAddress = getBrokerAddress();
+; brokerZnodePath = LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
LoadReport loadReport = null;
try {
loadReport = generateLoadReport();
@@ -753,7 +753,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
this.resourceUnitRankings = newResourceUnitRankings;
} else {
log.info("Leader broker[{}] No ResourceUnits to rank this run, Using Old Ranking",
- pulsar.getWebServiceAddress());
+ pulsar.getSafeWebServiceAddress());
}
}
@@ -919,8 +919,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
}
brokerCandidateCache.clear();
try {
- LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, availableBrokersCache,
- brokerTopicLoadingPredicate);
+ LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
+ availableBrokersCache, brokerTopicLoadingPredicate);
} catch (Exception e) {
log.warn("Error when trying to apply policies: {}", e);
for (final Map.Entry<Long, Set<ResourceUnit>> entry : availableBrokers.entrySet()) {
@@ -1110,12 +1110,12 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
private LoadReport generateLoadReportForcefully() throws Exception {
synchronized (bundleGainsCache) {
try {
- LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
- pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
+ LoadReport loadReport = new LoadReport(pulsar.getSafeWebServiceAddress(),
+ pulsar.getWebServiceAddressTls(), pulsar.getSafeBrokerServiceUrl(),
+ pulsar.getBrokerServiceUrlTls());
loadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
loadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
- loadReport.setName(String.format("%s:%s", pulsar.getAdvertisedAddress(),
- pulsar.getConfiguration().getWebServicePort().get()));
+ loadReport.setName(getBrokerAddress());
loadReport.setBrokerVersionString(pulsar.getBrokerVersion());
SystemResourceUsage systemResourceUsage = this.getSystemResourceUsage();
@@ -1183,6 +1183,13 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
}
}
+ private String getBrokerAddress() {
+ return String.format("%s:%s", pulsar.getAdvertisedAddress(),
+ pulsar.getConfiguration().getWebServicePort().isPresent()
+ ? pulsar.getConfiguration().getWebServicePort().get()
+ : pulsar.getConfiguration().getWebServicePortTls());
+ }
+
@Override
public void setLoadReportForceUpdateFlag() {
this.forceLoadReportUpdate = true;
@@ -1236,9 +1243,11 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
// calculate percentage of change
double cpuChange = (newUsage.cpu.limit > 0)
- ? ((newUsage.cpu.usage - oldUsage.cpu.usage) * 100 / newUsage.cpu.limit) : 0;
+ ? ((newUsage.cpu.usage - oldUsage.cpu.usage) * 100 / newUsage.cpu.limit)
+ : 0;
double memChange = (newUsage.memory.limit > 0)
- ? ((newUsage.memory.usage - oldUsage.memory.usage) * 100 / newUsage.memory.limit) : 0;
+ ? ((newUsage.memory.usage - oldUsage.memory.usage) * 100 / newUsage.memory.limit)
+ : 0;
double directMemChange = (newUsage.directMemory.limit > 0)
? ((newUsage.directMemory.usage - oldUsage.directMemory.usage) * 100
/ newUsage.directMemory.limit)
@@ -1258,10 +1267,9 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
if (resourceChange > pulsar.getConfiguration().getLoadBalancerReportUpdateThresholdPercentage()) {
needUpdate = true;
- log.info("LoadReport update triggered by change on resource usage, detal ({}).",
- String.format(
- "cpu: %.1f%%, mem: %.1f%%, directMemory: %.1f%%, bandwidthIn: %.1f%%, bandwidthOut: %.1f%%)",
- cpuChange, memChange, directMemChange, bandwidthInChange, bandwidthOutChange));
+ log.info("LoadReport update triggered by change on resource usage, detal ({}).", String.format(
+ "cpu: %.1f%%, mem: %.1f%%, directMemory: %.1f%%, bandwidthIn: %.1f%%, bandwidthOut: %.1f%%)",
+ cpuChange, memChange, directMemChange, bandwidthInChange, bandwidthOutChange));
}
}
}
@@ -1435,8 +1443,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
if (stats.topics <= 1) {
log.info("Unable to split hot namespace bundle {} since there is only one topic.", bundleName);
} else {
- NamespaceName namespaceName = NamespaceName.get(
- LoadManagerShared.getNamespaceNameFromBundleName(bundleName));
+ NamespaceName namespaceName = NamespaceName
+ .get(LoadManagerShared.getNamespaceNameFromBundleName(bundleName));
int numBundles = pulsar.getNamespaceService().getBundleCount(namespaceName);
if (numBundles >= maxBundleCount) {
log.info("Unable to split hot namespace bundle {} since the namespace has too many bundles.",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 2d9c6ec..5fbc20f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -148,7 +148,7 @@ public class NamespaceService {
host = pulsar.getAdvertisedAddress();
this.config = pulsar.getConfiguration();
this.loadManager = pulsar.getLoadManager();
- ServiceUnitZkUtils.initZK(pulsar.getLocalZkCache().getZooKeeper(), pulsar.getBrokerServiceUrl());
+ ServiceUnitZkUtils.initZK(pulsar.getLocalZkCache().getZooKeeper(), pulsar.getSafeBrokerServiceUrl());
this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
this.ownershipCache = new OwnershipCache(pulsar, bundleFactory);
this.namespaceClients = new ConcurrentOpenHashMap<>();
@@ -259,7 +259,7 @@ public class NamespaceService {
*/
private boolean registerNamespace(String namespace, boolean ensureOwned) throws PulsarServerException {
- String myUrl = pulsar.getBrokerServiceUrl();
+ String myUrl = pulsar.getSafeBrokerServiceUrl();
try {
NamespaceName nsname = NamespaceName.get(namespace);
@@ -394,7 +394,7 @@ public class NamespaceService {
} else {
if (authoritative) {
// leader broker already assigned the current broker as owner
- candidateBroker = pulsar.getWebServiceAddress();
+ candidateBroker = pulsar.getSafeWebServiceAddress();
} else {
// forward to leader broker to make assignment
candidateBroker = pulsar.getLeaderElectionService().getCurrentLeader().getServiceUrl();
@@ -410,7 +410,7 @@ public class NamespaceService {
try {
checkNotNull(candidateBroker);
- if (pulsar.getWebServiceAddress().equals(candidateBroker)) {
+ if (pulsar.getSafeWebServiceAddress().equals(candidateBroker)) {
// invalidate namespace policies and try to load latest policies to avoid data-discrepancy if broker
// doesn't receive watch on policies changes
final String policyPath = AdminResource.path(POLICIES, bundle.getNamespaceObject().toString());
@@ -522,7 +522,7 @@ public class NamespaceService {
String lookupAddress = leastLoadedBroker.get().getResourceId();
if (LOG.isDebugEnabled()) {
- LOG.debug("{} : redirecting to the least loaded broker, lookup address={}", pulsar.getWebServiceAddress(),
+ LOG.debug("{} : redirecting to the least loaded broker, lookup address={}", pulsar.getSafeWebServiceAddress(),
lookupAddress);
}
return Optional.of(lookupAddress);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index f7dce01..f48362a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -149,12 +149,12 @@ public class OwnershipCache {
* the local broker URL that will be set as owner for the <code>ServiceUnit</code>
*/
public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory) {
- this.ownerBrokerUrl = pulsar.getBrokerServiceUrl();
+ this.ownerBrokerUrl = pulsar.getSafeBrokerServiceUrl();
this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
- pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), false);
+ pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), false);
this.selfOwnerInfoDisabled = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
- pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), true);
+ pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), true);
this.bundleFactory = bundleFactory;
this.localZkCache = pulsar.getLocalZkCache();
this.ownershipReadOnlyCache = pulsar.getLocalZkCacheService().ownerInfoCache();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 97eca09..a03849d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -749,7 +749,7 @@ public abstract class PulsarWebResource {
String leaderAddress = pulsar.getLeaderElectionService().getCurrentLeader().getServiceUrl();
- String myAddress = pulsar.getWebServiceAddress();
+ String myAddress = pulsar.getSafeWebServiceAddress();
return myAddress.equals(leaderAddress); // If i am the leader, my decisions are
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 3ae55bc..61c7ccb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -187,7 +187,7 @@ public class WebService implements AutoCloseable {
server.start();
- log.info("Web Service started at {}", pulsar.getWebServiceAddress());
+ log.info("Web Service started at {}", pulsar.getSafeWebServiceAddress());
} catch (Exception e) {
throw new PulsarServerException(e);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index f32d0b5..c615a09 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -618,7 +618,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
new byte[0], null, null);
// setup ownership to localhost
- URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());
+ URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, false, false, false);
doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
try {
@@ -669,7 +669,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
@Test
public void testDeleteNamespaceWithBundles() throws Exception {
- URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());
+ URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
String bundledNsLocal = "test-bundled-namespace-1";
BundlesData bundleData = new BundlesData(Lists.newArrayList("0x00000000", "0x80000000", "0xffffffff"));
createBundledTestNamespaces(this.testTenant, this.testLocalCluster, bundledNsLocal, bundleData);
@@ -791,7 +791,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
@Test
public void testUnloadNamespaces() throws Exception {
final NamespaceName testNs = this.testLocalNamespaces.get(1);
- URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());
+ URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc)
.getWebServiceUrl(Mockito.argThat(new Matcher<NamespaceBundle>() {
@@ -851,7 +851,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
@Test
public void testSplitBundles() throws Exception {
- URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());
+ URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
String bundledNsLocal = "test-bundled-namespace-1";
BundlesData bundleData = new BundlesData(Lists.newArrayList("0x00000000", "0xffffffff"));
createBundledTestNamespaces(this.testTenant, this.testLocalCluster, bundledNsLocal, bundleData);
@@ -882,7 +882,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
@Test
public void testSplitBundleWithUnDividedRange() throws Exception {
- URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());
+ URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
String bundledNsLocal = "test-bundled-namespace-1";
BundlesData bundleData = new BundlesData(
Lists.newArrayList("0x00000000", "0x08375b1a", "0x08375b1b", "0xffffffff"));
@@ -907,7 +907,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
@Test
public void testUnloadNamespaceWithBundles() throws Exception {
- URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());
+ URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
String bundledNsLocal = "test-bundled-namespace-1";
BundlesData bundleData = new BundlesData(Lists.newArrayList("0x00000000", "0x80000000", "0xffffffff"));
createBundledTestNamespaces(this.testTenant, this.testLocalCluster, bundledNsLocal, bundleData);
@@ -1018,7 +1018,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
@Test
public void testValidateNamespaceOwnershipWithBundles() throws Exception {
try {
- URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());
+ URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
String bundledNsLocal = "test-bundled-namespace-1";
BundlesData bundleData = new BundlesData(Lists.newArrayList("0x00000000", "0xffffffff"));
createBundledTestNamespaces(this.testTenant, this.testLocalCluster, bundledNsLocal, bundleData);
@@ -1040,7 +1040,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
@Test
public void testRetention() throws Exception {
try {
- URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());
+ URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
String bundledNsLocal = "test-bundled-namespace-1";
BundlesData bundleData = new BundlesData(Lists.newArrayList("0x00000000", "0xffffffff"));
createBundledTestNamespaces(this.testTenant, this.testLocalCluster, bundledNsLocal, bundleData);
@@ -1098,7 +1098,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
@Test
public void testValidateTopicOwnership() throws Exception {
- URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());
+ URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
String bundledNsLocal = "test-bundled-namespace-1";
BundlesData bundleData = new BundlesData(Lists.newArrayList("0x00000000", "0xffffffff"));
createBundledTestNamespaces(this.testTenant, this.testLocalCluster, bundledNsLocal, bundleData);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
index 39cbf32..176c851 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
@@ -651,7 +651,7 @@ public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest {
final String cluster = pulsar.getConfiguration().getClusterName();
admin.clusters().createCluster(cluster,
- new ClusterData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls()));
+ new ClusterData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls()));
// create
FailureDomain domain = new FailureDomain();
domain.setBrokers(Sets.newHashSet("b1", "b2", "b3"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index 1672bdf..cb9f98f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -511,7 +511,7 @@ public class AntiAffinityNamespaceGroupTest {
admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup);
}
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getWebServiceAddress()).build();
+ PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getSafeWebServiceAddress()).build();
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://" + namespace + "0/my-topic1")
.create();
ModularLoadManagerImpl loadManager = (ModularLoadManagerImpl) ((ModularLoadManagerWrapper) pulsar1
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 0399548..84f54a7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -151,7 +151,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
new Stat());
NamespaceEphemeralData node = ObjectMapperFactory.getThreadLocal().readValue(data,
NamespaceEphemeralData.class);
- Assert.assertEquals(node.getNativeUrl(), this.pulsar.getBrokerServiceUrl());
+ Assert.assertEquals(node.getNativeUrl(), this.pulsar.getSafeBrokerServiceUrl());
} catch (Exception e) {
fail("failed to setup ownership", e);
}
@@ -421,7 +421,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
new Stat());
NamespaceEphemeralData node = ObjectMapperFactory.getThreadLocal().readValue(data,
NamespaceEphemeralData.class);
- Assert.assertEquals(node.getNativeUrl(), this.pulsar.getBrokerServiceUrl());
+ Assert.assertEquals(node.getNativeUrl(), this.pulsar.getSafeBrokerServiceUrl());
} catch (Exception e) {
fail("failed to setup ownership", e);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index 65991ca..d616e21 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -96,8 +96,8 @@ public class OwnershipCacheTest {
doReturn(Optional.ofNullable(new Integer(port))).when(config).getBrokerServicePort();
doReturn(Optional.ofNullable(null)).when(config).getWebServicePort();
doReturn(brokerService).when(pulsar).getBrokerService();
- doReturn(webAddress(config)).when(pulsar).getWebServiceAddress();
- doReturn(selfBrokerUrl).when(pulsar).getBrokerServiceUrl();
+ doReturn(webAddress(config)).when(pulsar).getSafeWebServiceAddress();
+ doReturn(selfBrokerUrl).when(pulsar).getSafeBrokerServiceUrl();
}
@AfterMethod
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
index 55d136a..988c65a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
@@ -71,13 +71,13 @@ public class AdvertisedAddressTest {
@Test
public void testAdvertisedAddress() throws Exception {
Assert.assertEquals( pulsar.getAdvertisedAddress(), advertisedAddress );
- Assert.assertEquals( pulsar.getBrokerServiceUrl(), String.format("pulsar://%s:%d", advertisedAddress, BROKER_SERVICE_PORT) );
- Assert.assertEquals( pulsar.getWebServiceAddress(), String.format("http://%s:%d", advertisedAddress, BROKER_WEBSERVICE_PORT) );
+ Assert.assertEquals( pulsar.getSafeBrokerServiceUrl(), String.format("pulsar://%s:%d", advertisedAddress, BROKER_SERVICE_PORT) );
+ Assert.assertEquals( pulsar.getSafeWebServiceAddress(), String.format("http://%s:%d", advertisedAddress, BROKER_WEBSERVICE_PORT) );
String brokerZkPath = String.format("/loadbalance/brokers/%s:%d", pulsar.getAdvertisedAddress(), BROKER_WEBSERVICE_PORT);
String bkBrokerData = new String(bkEnsemble.getZkClient().getData(brokerZkPath, false, new Stat()), StandardCharsets.UTF_8);
JsonObject jsonBkBrokerData = new Gson().fromJson(bkBrokerData, JsonObject.class);
- Assert.assertEquals( jsonBkBrokerData.get("pulsarServiceUrl").getAsString(), pulsar.getBrokerServiceUrl() );
- Assert.assertEquals( jsonBkBrokerData.get("webServiceUrl").getAsString(), pulsar.getWebServiceAddress() );
+ Assert.assertEquals( jsonBkBrokerData.get("pulsarServiceUrl").getAsString(), pulsar.getSafeBrokerServiceUrl() );
+ Assert.assertEquals( jsonBkBrokerData.get("webServiceUrl").getAsString(), pulsar.getSafeWebServiceAddress() );
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
index 1581bb9..771ed50 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
@@ -83,7 +83,7 @@ public class PeerReplicatorTest extends ReplicatorTestBase {
@Test(dataProvider = "lookupType", timeOut = 10000)
public void testPeerClusterTopicLookup(String protocol) throws Exception {
- // clean up peer-clusters
+ // clean up peer-clusters
admin1.clusters().updatePeerClusterNames("r1", null);
admin1.clusters().updatePeerClusterNames("r2", null);
admin1.clusters().updatePeerClusterNames("r3", null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index e503c90..ca5ba68 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -208,11 +208,11 @@ public class ReplicatorTestBase {
// Provision the global namespace
admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), urlTls1.toString(),
- pulsar1.getBrokerServiceUrl(), pulsar1.getBrokerServiceUrlTls()));
+ pulsar1.getSafeBrokerServiceUrl(), pulsar1.getBrokerServiceUrlTls()));
admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), urlTls2.toString(),
- pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()));
+ pulsar2.getSafeBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()));
admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), urlTls3.toString(),
- pulsar3.getBrokerServiceUrl(), pulsar3.getBrokerServiceUrlTls()));
+ pulsar3.getSafeBrokerServiceUrl(), pulsar3.getBrokerServiceUrlTls()));
admin1.tenants().createTenant("pulsar",
new TenantInfo(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r2", "r3")));
@@ -222,9 +222,9 @@ public class ReplicatorTestBase {
assertEquals(admin2.clusters().getCluster("r1").getServiceUrl(), url1.toString());
assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString());
assertEquals(admin2.clusters().getCluster("r3").getServiceUrl(), url3.toString());
- assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(), pulsar1.getBrokerServiceUrl());
- assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl());
- assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getBrokerServiceUrl());
+ assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(), pulsar1.getSafeBrokerServiceUrl());
+ assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getSafeBrokerServiceUrl());
+ assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getSafeBrokerServiceUrl());
// Also create V1 namespace for compatibility check
admin1.clusters().createCluster("global", new ClusterData("http://global:8080", "https://global:8443"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index 22e0819..f0856b6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -287,7 +287,7 @@ public class WebServiceTest {
try {
pulsarAdmin.clusters().createCluster(config.getClusterName(),
- new ClusterData(pulsar.getWebServiceAddress()));
+ new ClusterData(pulsar.getSafeWebServiceAddress()));
} catch (ConflictException ce) {
// This is OK.
} finally {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index c9d92b6..49206ff 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -161,7 +161,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
// mock: return Broker2 as a Least-loaded broker when leader receies request [3]
doReturn(true).when(loadManager1).isCentralized();
- SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getWebServiceAddress(), null);
+ SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getSafeWebServiceAddress(), null);
doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));
@@ -252,7 +252,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
// mock: return Broker2 as a Least-loaded broker when leader receies request
doReturn(true).when(loadManager2).isCentralized();
- SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getWebServiceAddress(), null);
+ SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getSafeWebServiceAddress(), null);
doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager2));
/**** started broker-2 ****/
@@ -826,7 +826,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2));
// mock: return Broker1 as a Least-loaded broker when leader receies request [3]
doReturn(true).when(loadManager1).isCentralized();
- SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddress(), null);
+ SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getSafeWebServiceAddress(), null);
doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));
@@ -934,7 +934,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2));
// mock: return Broker1 as a Least-loaded broker when leader receies request [3]
doReturn(true).when(loadManager1).isCentralized();
- SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddress(), null);
+ SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getSafeWebServiceAddress(), null);
Optional<ResourceUnit> res = Optional.of(resourceUnit);
doReturn(res).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
doReturn(res).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
@@ -1031,7 +1031,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
conf.setClientLibraryVersionCheckEnabled(true);
startBroker();
- URI brokerServiceUrl = new URI(pulsar.getWebServiceAddress());
+ URI brokerServiceUrl = new URI(pulsar.getSafeWebServiceAddress());
URL url = brokerServiceUrl.toURL();
String path = String.format("admin/%s/partitions", dest.getLookupName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 9d7392d..f70e6a1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -950,11 +950,11 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
admin3 = PulsarAdmin.builder().serviceHttpUrl(url3.toString()).build();
// Provision the global namespace
- admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), null, pulsar1.getBrokerServiceUrl(),
+ admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), null, pulsar1.getSafeBrokerServiceUrl(),
pulsar1.getBrokerServiceUrlTls()));
- admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), null, pulsar2.getBrokerServiceUrl(),
+ admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), null, pulsar2.getSafeBrokerServiceUrl(),
pulsar1.getBrokerServiceUrlTls()));
- admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), null, pulsar3.getBrokerServiceUrl(),
+ admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), null, pulsar3.getSafeBrokerServiceUrl(),
pulsar1.getBrokerServiceUrlTls()));
admin1.clusters().createCluster("global", new ClusterData("http://global:8080"));
@@ -967,9 +967,9 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
assertEquals(admin2.clusters().getCluster("r1").getServiceUrl(), url1.toString());
assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString());
assertEquals(admin2.clusters().getCluster("r3").getServiceUrl(), url3.toString());
- assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(), pulsar1.getBrokerServiceUrl());
- assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl());
- assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getBrokerServiceUrl());
+ assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(), pulsar1.getSafeBrokerServiceUrl());
+ assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getSafeBrokerServiceUrl());
+ assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getSafeBrokerServiceUrl());
Thread.sleep(100);
log.info("--- ReplicatorTestBase::setup completed ---");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
index 167fc7a..01329d8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
@@ -54,7 +54,7 @@ public class ServiceUrlProviderTest extends ProducerConsumerBase {
public void testCreateClientWithServiceUrlProvider() throws Exception {
PulsarClient client = PulsarClient.builder()
- .serviceUrlProvider(new TestServiceUrlProvider(pulsar.getBrokerServiceUrl()))
+ .serviceUrlProvider(new TestServiceUrlProvider(pulsar.getSafeBrokerServiceUrl()))
.statsInterval(1, TimeUnit.SECONDS)
.build();
Assert.assertTrue(((PulsarClientImpl) client).getConfiguration().getServiceUrlProvider() instanceof TestServiceUrlProvider);
@@ -68,7 +68,7 @@ public class ServiceUrlProviderTest extends ProducerConsumerBase {
for (int i = 0; i < 100; i++) {
producer.send("Hello Pulsar[" + i + "]");
}
- client.updateServiceUrl(pulsar.getBrokerServiceUrl());
+ client.updateServiceUrl(pulsar.getSafeBrokerServiceUrl());
for (int i = 100; i < 200; i++) {
producer.send("Hello Pulsar[" + i + "]");
}
@@ -87,7 +87,7 @@ public class ServiceUrlProviderTest extends ProducerConsumerBase {
@Test
public void testCreateClientWithAutoChangedServiceUrlProvider() throws Exception {
- AutoChangedServiceUrlProvider serviceUrlProvider = new AutoChangedServiceUrlProvider(pulsar.getBrokerServiceUrl());
+ AutoChangedServiceUrlProvider serviceUrlProvider = new AutoChangedServiceUrlProvider(pulsar.getSafeBrokerServiceUrl());
PulsarClient client = PulsarClient.builder()
.serviceUrlProvider(serviceUrlProvider)
@@ -109,26 +109,26 @@ public class ServiceUrlProviderTest extends ProducerConsumerBase {
startBroker();
PulsarService pulsarService2 = pulsar;
- log.info("Pulsar1 = {}, Pulsar2 = {}", pulsarService1.getBrokerServiceUrl(), pulsarService2.getBrokerServiceUrl());
- Assert.assertNotEquals(pulsarService1.getBrokerServiceUrl(), pulsarService2.getBrokerServiceUrl());
+ log.info("Pulsar1 = {}, Pulsar2 = {}", pulsarService1.getSafeBrokerServiceUrl(), pulsarService2.getSafeBrokerServiceUrl());
+ Assert.assertNotEquals(pulsarService1.getSafeBrokerServiceUrl(), pulsarService2.getSafeBrokerServiceUrl());
log.info("Service url : producer = {}, consumer = {}",
producer.getClient().getLookup().getServiceUrl(),
consumer.getClient().getLookup().getServiceUrl());
- Assert.assertEquals(producer.getClient().getLookup().getServiceUrl(), pulsarService1.getBrokerServiceUrl());
- Assert.assertEquals(consumer.getClient().getLookup().getServiceUrl(), pulsarService1.getBrokerServiceUrl());
+ Assert.assertEquals(producer.getClient().getLookup().getServiceUrl(), pulsarService1.getSafeBrokerServiceUrl());
+ Assert.assertEquals(consumer.getClient().getLookup().getServiceUrl(), pulsarService1.getSafeBrokerServiceUrl());
log.info("Changing service url from {} to {}",
- pulsarService1.getBrokerServiceUrl(),
- pulsarService2.getBrokerServiceUrl());
+ pulsarService1.getSafeBrokerServiceUrl(),
+ pulsarService2.getSafeBrokerServiceUrl());
- serviceUrlProvider.onServiceUrlChanged(pulsarService2.getBrokerServiceUrl());
+ serviceUrlProvider.onServiceUrlChanged(pulsarService2.getSafeBrokerServiceUrl());
log.info("Service url changed : producer = {}, consumer = {}",
producer.getClient().getLookup().getServiceUrl(),
consumer.getClient().getLookup().getServiceUrl());
- Assert.assertEquals(producer.getClient().getLookup().getServiceUrl(), pulsarService2.getBrokerServiceUrl());
- Assert.assertEquals(consumer.getClient().getLookup().getServiceUrl(), pulsarService2.getBrokerServiceUrl());
+ Assert.assertEquals(producer.getClient().getLookup().getServiceUrl(), pulsarService2.getSafeBrokerServiceUrl());
+ Assert.assertEquals(consumer.getClient().getLookup().getServiceUrl(), pulsarService2.getSafeBrokerServiceUrl());
producer.close();
consumer.close();
client.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 9a26036..2d5c9f2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -407,7 +407,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
}
Client client = ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class));
- final String baseUrl = pulsar.getWebServiceAddress()
+ final String baseUrl = pulsar.getSafeWebServiceAddress()
.replace(Integer.toString(pulsar.getConfiguration().getWebServicePort().get()), (Integer.toString(port)))
+ "/admin/v2/proxy-stats/";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
index debe0f0..f0263bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
@@ -59,7 +59,7 @@ public class ProxyPublishConsumeWithoutZKTest extends ProducerConsumerBase {
WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
config.setWebServicePort(Optional.of(port));
config.setClusterName("test");
- config.setServiceUrl(pulsar.getWebServiceAddress());
+ config.setServiceUrl(pulsar.getSafeWebServiceAddress());
config.setServiceUrlTls(pulsar.getWebServiceAddressTls());
service = spy(new WebSocketService(config));
doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();