You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/01/21 20:49:07 UTC
(pulsar) 01/02: [fix][broker] Fix leader broker cannot be determined when the advertised address and advertised listeners are configured (#21894)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 940ede5ea635c9a51cd877cdb6b2d0ee07a651b4
Author: Lari Hotari <lh...@apache.org>
AuthorDate: Sun Jan 21 22:44:23 2024 +0200
[fix][broker] Fix leader broker cannot be determined when the advertised address and advertised listeners are configured (#21894)
(cherry picked from commit 3158fd3550f9e3a0b2c0316c92265318b209f4f5)
---
.../org/apache/pulsar/broker/PulsarService.java | 23 +-
.../pulsar/broker/admin/impl/BrokersBase.java | 12 +-
.../pulsar/broker/admin/impl/NamespacesBase.java | 11 +-
.../pulsar/broker/loadbalance/LeaderBroker.java | 18 ++
.../broker/loadbalance/LeaderElectionService.java | 12 +-
.../pulsar/broker/loadbalance/NoopLoadManager.java | 14 +-
.../pulsar/broker/loadbalance/ResourceUnit.java | 2 -
.../loadbalance/extensions/BrokerRegistryImpl.java | 2 +-
.../extensions/ExtensibleLoadManagerImpl.java | 15 +-
.../channel/ServiceUnitStateChannelImpl.java | 38 +--
.../policies/IsolationPoliciesHelper.java | 8 +-
.../reporter/BrokerLoadDataReporter.java | 14 +-
.../reporter/TopBundleLoadDataReporter.java | 14 +-
.../broker/loadbalance/impl/LoadManagerShared.java | 99 +++----
.../loadbalance/impl/ModularLoadManagerImpl.java | 15 +-
.../impl/ModularLoadManagerWrapper.java | 21 +-
.../loadbalance/impl/SimpleLoadManagerImpl.java | 42 ++-
.../pulsar/broker/namespace/NamespaceService.java | 75 ++---
.../pulsar/broker/service/BrokerService.java | 2 +-
.../service/nonpersistent/NonPersistentTopic.java | 2 +-
.../broker/service/persistent/PersistentTopic.java | 2 +-
.../pulsar/broker/web/PulsarWebResource.java | 54 ++--
.../apache/pulsar/broker/SLAMonitoringTest.java | 16 +-
.../broker/admin/AdminApiMultiBrokersTest.java | 5 +-
.../apache/pulsar/broker/admin/AdminApiTest.java | 22 +-
.../org/apache/pulsar/broker/admin/AdminTest.java | 3 +-
.../pulsar/broker/admin/v1/V1_AdminApiTest.java | 2 +-
...isedListenersMultiBrokerLeaderElectionTest.java | 42 +++
.../loadbalance/LeaderElectionServiceTest.java | 3 +-
.../broker/loadbalance/LoadBalancerTest.java | 8 +-
.../loadbalance/MultiBrokerLeaderElectionTest.java | 94 +++++--
.../loadbalance/SimpleLoadManagerImplTest.java | 34 +--
.../loadbalance/extensions/BrokerRegistryTest.java | 4 +-
.../extensions/ExtensibleLoadManagerImplTest.java | 52 ++--
.../channel/ServiceUnitStateChannelTest.java | 312 ++++++++++-----------
.../filter/BrokerIsolationPoliciesFilterTest.java | 64 ++---
.../extensions/scheduler/TransferShedderTest.java | 296 +++++++++----------
.../impl/ModularLoadManagerImplTest.java | 130 ++++-----
.../broker/namespace/NamespaceServiceTest.java | 39 ++-
.../pulsar/broker/service/BrokerServiceTest.java | 10 +-
.../broker/service/InactiveTopicDeleteTest.java | 4 +-
.../systopic/PartitionedSystemTopicTest.java | 12 +-
.../broker/testcontext/PulsarTestContext.java | 45 ++-
.../pulsar/client/api/BrokerServiceLookupTest.java | 10 +-
.../apache/pulsar/compaction/CompactionTest.java | 4 +-
.../pulsar/common/policies/data/BrokerInfo.java | 2 +
.../common/policies/data/impl/BrokerInfoImpl.java | 9 +-
.../server/ProxyWithExtensibleLoadManagerTest.java | 2 +-
48 files changed, 941 insertions(+), 778 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index a04a4c137cc..42d43b3dcf2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -275,6 +275,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private TransactionPendingAckStoreProvider transactionPendingAckStoreProvider;
private final ExecutorProvider transactionExecutorProvider;
private final DefaultMonotonicSnapshotClock monotonicSnapshotClock;
+ private String brokerId;
public enum State {
Init, Started, Closing, Closed
@@ -307,6 +308,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
// Validate correctness of configuration
PulsarConfigurationLoader.isComplete(config);
TransactionBatchedWriteValidator.validate(config);
+ this.config = config;
// validate `advertisedAddress`, `advertisedListeners`, `internalListenerName`
this.advertisedListeners = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
@@ -317,7 +319,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {
// use `internalListenerName` listener as `advertisedAddress`
this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getBindAddress());
this.brokerVersion = PulsarVersion.getVersion();
- this.config = config;
this.processTerminator = processTerminator;
this.loadManagerExecutor = Executors
.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager"));
@@ -828,6 +829,12 @@ public class PulsarService implements AutoCloseable, ShutdownService {
this.brokerServiceUrl = brokerUrl(config);
this.brokerServiceUrlTls = brokerUrlTls(config);
+ // the broker id is used in the load manager to identify the broker
+ this.brokerId =
+ String.format("%s:%s", advertisedAddress, config.getWebServicePortTls().isPresent()
+ ? config.getWebServicePortTls().get()
+ : config.getWebServicePort().orElseThrow());
+
if (this.compactionServiceFactory == null) {
this.compactionServiceFactory = loadCompactionServiceFactory();
}
@@ -1099,7 +1106,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
}
private void handleDeleteCluster(Notification notification) {
- if (ClusterResources.pathRepresentsClusterName(notification.getPath())
+ if (isRunning() && ClusterResources.pathRepresentsClusterName(notification.getPath())
&& notification.getType() == NotificationType.Deleted) {
final String clusterName = ClusterResources.clusterNameFromPath(notification.getPath());
getBrokerService().closeAndRemoveReplicationClient(clusterName);
@@ -1137,7 +1144,8 @@ public class PulsarService implements AutoCloseable, ShutdownService {
LOG.info("The load manager extension is enabled. Skipping PulsarService LeaderElectionService.");
return;
}
- this.leaderElectionService = new LeaderElectionService(coordinationService, getSafeWebServiceAddress(),
+ this.leaderElectionService =
+ new LeaderElectionService(coordinationService, getBrokerId(), getSafeWebServiceAddress(),
state -> {
if (state == LeaderElectionState.Leading) {
LOG.info("This broker was elected leader");
@@ -1185,7 +1193,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
protected void acquireSLANamespace() {
try {
// Namespace not created hence no need to unload it
- NamespaceName nsName = NamespaceService.getSLAMonitorNamespace(getLookupServiceAddress(), config);
+ NamespaceName nsName = NamespaceService.getSLAMonitorNamespace(getBrokerId(), config);
if (!this.pulsarResources.getNamespaceResources().namespaceExists(nsName)) {
LOG.info("SLA Namespace = {} doesn't exist.", nsName);
return;
@@ -1694,10 +1702,9 @@ public class PulsarService implements AutoCloseable, ShutdownService {
return brokerServiceUrlTls != null ? brokerServiceUrlTls : brokerServiceUrl;
}
- public String getLookupServiceAddress() {
- return String.format("%s:%s", advertisedAddress, config.getWebServicePortTls().isPresent()
- ? config.getWebServicePortTls().get()
- : config.getWebServicePort().orElseThrow());
+ public String getBrokerId() {
+ return Objects.requireNonNull(brokerId,
+ "brokerId is not initialized before start has been called");
}
public synchronized void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 3fb1941b33a..ad3d7e789e4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -141,7 +141,9 @@ public class BrokersBase extends AdminResource {
validateSuperUserAccessAsync().thenAccept(__ -> {
LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader()
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find leader broker"));
- BrokerInfo brokerInfo = BrokerInfo.builder().serviceUrl(leaderBroker.getServiceUrl()).build();
+ BrokerInfo brokerInfo = BrokerInfo.builder()
+ .serviceUrl(leaderBroker.getServiceUrl())
+ .brokerId(leaderBroker.getBrokerId()).build();
LOG.info("[{}] Successfully to get the information of the leader broker.", clientAppId());
asyncResponse.resume(brokerInfo);
})
@@ -164,7 +166,7 @@ public class BrokersBase extends AdminResource {
@PathParam("clusterName") String cluster,
@PathParam("broker-webserviceurl") String broker) {
validateSuperUserAccessAsync()
- .thenAccept(__ -> validateBrokerName(broker))
+ .thenCompose(__ -> maybeRedirectToBroker(broker))
.thenCompose(__ -> validateClusterOwnershipAsync(cluster))
.thenCompose(__ -> pulsar().getNamespaceService().getOwnedNameSpacesStatusAsync())
.thenAccept(asyncResponse::resume)
@@ -396,10 +398,10 @@ public class BrokersBase extends AdminResource {
private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion) {
- String lookupServiceAddress = pulsar().getLookupServiceAddress();
+ String brokerId = pulsar().getBrokerId();
NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
- ? NamespaceService.getHeartbeatNamespaceV2(lookupServiceAddress, pulsar().getConfiguration())
- : NamespaceService.getHeartbeatNamespace(lookupServiceAddress, pulsar().getConfiguration());
+ ? NamespaceService.getHeartbeatNamespaceV2(brokerId, pulsar().getConfiguration())
+ : NamespaceService.getHeartbeatNamespace(brokerId, pulsar().getConfiguration());
final String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), topicName);
final String messageStr = UUID.randomUUID().toString();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index caaff010439..f274cffa46b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -923,9 +923,9 @@ public abstract class NamespacesBase extends AdminResource {
return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, errorStr));
}
LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get();
- String leaderBrokerUrl = leaderBroker.getServiceUrl();
+ String leaderBrokerId = leaderBroker.getBrokerId();
return pulsar().getNamespaceService()
- .createLookupResult(leaderBrokerUrl, false, null)
+ .createLookupResult(leaderBrokerId, false, null)
.thenCompose(lookupResult -> {
String redirectUrl = isRequestHttps() ? lookupResult.getLookupData().getHttpUrlTls()
: lookupResult.getLookupData().getHttpUrl();
@@ -948,7 +948,7 @@ public abstract class NamespacesBase extends AdminResource {
return FutureUtil.failedFuture((
new WebApplicationException(Response.temporaryRedirect(redirect).build())));
} catch (MalformedURLException exception) {
- log.error("The leader broker url is malformed - {}", leaderBrokerUrl);
+ log.error("The redirect url is malformed - {}", redirectUrl);
return FutureUtil.failedFuture(new RestException(exception));
}
});
@@ -984,8 +984,11 @@ public abstract class NamespacesBase extends AdminResource {
}
public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String bundleRange,
- String destinationBroker,
+ String destinationBrokerParam,
boolean authoritative) {
+ String destinationBroker = StringUtils.isBlank(destinationBrokerParam) ? null :
+ // ensure backward compatibility: strip the possible http:// or https:// prefix
+ destinationBrokerParam.replaceFirst("http[s]?://", "");
return validateSuperUserAccessAsync()
.thenCompose(__ -> setNamespaceBundleAffinityAsync(bundleRange, destinationBroker))
.thenAccept(__ -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderBroker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderBroker.java
index acd34e151ed..d7c21de5ea1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderBroker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderBroker.java
@@ -30,5 +30,23 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
public class LeaderBroker {
+ private String brokerId;
private String serviceUrl;
+
+ public String getBrokerId() {
+ if (brokerId != null) {
+ return brokerId;
+ } else {
+ // for backward compatibility at runtime with older versions of Pulsar
+ return parseHostAndPort(serviceUrl);
+ }
+ }
+
+ private static String parseHostAndPort(String serviceUrl) {
+ int uriSeparatorPos = serviceUrl.indexOf("://");
+ if (uriSeparatorPos == -1) {
+ throw new IllegalArgumentException("'" + serviceUrl + "' isn't an URI.");
+ }
+ return serviceUrl.substring(uriSeparatorPos + 3);
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
index 05fe4353f3e..2e53b54e98f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
@@ -35,17 +35,17 @@ public class LeaderElectionService implements AutoCloseable {
private final LeaderElection<LeaderBroker> leaderElection;
private final LeaderBroker localValue;
- public LeaderElectionService(CoordinationService cs, String localWebServiceAddress,
- Consumer<LeaderElectionState> listener) {
- this(cs, localWebServiceAddress, ELECTION_ROOT, listener);
+ public LeaderElectionService(CoordinationService cs, String brokerId,
+ String serviceUrl, Consumer<LeaderElectionState> listener) {
+ this(cs, brokerId, serviceUrl, ELECTION_ROOT, listener);
}
public LeaderElectionService(CoordinationService cs,
- String localWebServiceAddress,
- String electionRoot,
+ String brokerId,
+ String serviceUrl, String electionRoot,
Consumer<LeaderElectionState> listener) {
this.leaderElection = cs.getLeaderElection(LeaderBroker.class, electionRoot, listener);
- this.localValue = new LeaderBroker(localWebServiceAddress);
+ this.localValue = new LeaderBroker(brokerId, serviceUrl);
}
public void start() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
index 80f887d394d..f9f36b705d4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
@@ -43,7 +43,7 @@ import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
public class NoopLoadManager implements LoadManager {
private PulsarService pulsar;
- private String lookupServiceAddress;
+ private String brokerId;
private ResourceUnit localResourceUnit;
private LockManager<LocalBrokerData> lockManager;
private Map<String, String> bundleBrokerAffinityMap;
@@ -57,16 +57,15 @@ public class NoopLoadManager implements LoadManager {
@Override
public void start() throws PulsarServerException {
- lookupServiceAddress = pulsar.getLookupServiceAddress();
- localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress),
- new PulsarResourceDescription());
+ brokerId = pulsar.getBrokerId();
+ localResourceUnit = new SimpleResourceUnit(brokerId, new PulsarResourceDescription());
LocalBrokerData localData = new LocalBrokerData(pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
localData.setProtocols(pulsar.getProtocolDataToAdvertise());
localData.setLoadManagerClassName(this.pulsar.getConfig().getLoadManagerClassName());
- String brokerReportPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
+ String brokerReportPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + brokerId;
try {
log.info("Acquiring broker resource lock on {}", brokerReportPath);
@@ -129,12 +128,12 @@ public class NoopLoadManager implements LoadManager {
@Override
public Set<String> getAvailableBrokers() throws Exception {
- return Collections.singleton(lookupServiceAddress);
+ return Collections.singleton(brokerId);
}
@Override
public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
- return CompletableFuture.completedFuture(Collections.singleton(lookupServiceAddress));
+ return CompletableFuture.completedFuture(Collections.singleton(brokerId));
}
@Override
@@ -153,7 +152,6 @@ public class NoopLoadManager implements LoadManager {
if (StringUtils.isBlank(broker)) {
return this.bundleBrokerAffinityMap.remove(bundle);
}
- broker = broker.replaceFirst("http[s]?://", "");
return this.bundleBrokerAffinityMap.put(bundle, broker);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java
index ef4dd2a97b2..c28a8be4c0d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java
@@ -23,8 +23,6 @@ package org.apache.pulsar.broker.loadbalance;
*/
public interface ResourceUnit extends Comparable<ResourceUnit> {
- String PROPERTY_KEY_BROKER_ZNODE_NAME = "__advertised_addr";
-
String getResourceId();
ResourceDescription getAvailableResource();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index bfdaa078f19..18e30ddf922 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -82,7 +82,7 @@ public class BrokerRegistryImpl implements BrokerRegistry {
this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
this.scheduler = pulsar.getLoadManagerExecutor();
this.listeners = new ArrayList<>();
- this.brokerId = pulsar.getLookupServiceAddress();
+ this.brokerId = pulsar.getBrokerId();
this.brokerLookupData = new BrokerLookupData(
pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 581183cf95a..dee660e1c3a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -348,7 +348,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
try {
this.brokerRegistry = new BrokerRegistryImpl(pulsar);
this.leaderElectionService = new LeaderElectionService(
- pulsar.getCoordinationService(), pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
+ pulsar.getCoordinationService(), pulsar.getBrokerId(),
+ pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
state -> {
pulsar.getLoadManagerExecutor().execute(() -> {
if (state == LeaderElectionState.Leading) {
@@ -790,7 +791,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
@VisibleForTesting
void playLeader() {
log.info("This broker:{} is setting the role from {} to {}",
- pulsar.getLookupServiceAddress(), role, Leader);
+ pulsar.getBrokerId(), role, Leader);
int retry = 0;
while (!Thread.currentThread().isInterrupted()) {
try {
@@ -807,7 +808,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
break;
} catch (Throwable e) {
log.error("The broker:{} failed to set the role. Retrying {} th ...",
- pulsar.getLookupServiceAddress(), ++retry, e);
+ pulsar.getBrokerId(), ++retry, e);
try {
Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
} catch (InterruptedException ex) {
@@ -818,7 +819,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
}
}
role = Leader;
- log.info("This broker:{} plays the leader now.", pulsar.getLookupServiceAddress());
+ log.info("This broker:{} plays the leader now.", pulsar.getBrokerId());
// flush the load data when the leader is elected.
brokerLoadDataReporter.reportAsync(true);
@@ -828,7 +829,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
@VisibleForTesting
void playFollower() {
log.info("This broker:{} is setting the role from {} to {}",
- pulsar.getLookupServiceAddress(), role, Follower);
+ pulsar.getBrokerId(), role, Follower);
int retry = 0;
while (!Thread.currentThread().isInterrupted()) {
try {
@@ -841,7 +842,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
break;
} catch (Throwable e) {
log.error("The broker:{} failed to set the role. Retrying {} th ...",
- pulsar.getLookupServiceAddress(), ++retry, e);
+ pulsar.getBrokerId(), ++retry, e);
try {
Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
} catch (InterruptedException ex) {
@@ -852,7 +853,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
}
}
role = Follower;
- log.info("This broker:{} plays a follower now.", pulsar.getLookupServiceAddress());
+ log.info("This broker:{} plays a follower now.", pulsar.getBrokerId());
// flush the load data when the leader is elected.
brokerLoadDataReporter.reportAsync(true);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index bd571284346..8704509c17c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -73,6 +73,7 @@ import org.apache.pulsar.PulsarClusterMetadataSetup;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
@@ -124,7 +125,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
private final ServiceConfiguration config;
private final Schema<ServiceUnitStateData> schema;
private final Map<String, CompletableFuture<String>> getOwnerRequests;
- private final String lookupServiceAddress;
+ private final String brokerId;
private final Map<String, CompletableFuture<Void>> cleanupJobs;
private final StateChangeListeners stateChangeListeners;
private ExtensibleLoadManagerImpl loadManager;
@@ -201,7 +202,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
public ServiceUnitStateChannelImpl(PulsarService pulsar) {
this.pulsar = pulsar;
this.config = pulsar.getConfig();
- this.lookupServiceAddress = pulsar.getLookupServiceAddress();
+ this.brokerId = pulsar.getBrokerId();
this.schema = Schema.JSON(ServiceUnitStateData.class);
this.getOwnerRequests = new ConcurrentHashMap<>();
this.cleanupJobs = new ConcurrentHashMap<>();
@@ -249,7 +250,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
},
0, ownershipMonitorDelayTimeInSecs, SECONDS);
log.info("This leader broker:{} started the ownership monitor.",
- lookupServiceAddress);
+ brokerId);
}
}
@@ -258,13 +259,13 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
monitorTask.cancel(false);
monitorTask = null;
log.info("This previous leader broker:{} stopped the ownership monitor.",
- lookupServiceAddress);
+ brokerId);
}
}
@Override
public void cleanOwnerships() {
- doCleanup(lookupServiceAddress);
+ doCleanup(brokerId);
}
public synchronized void start() throws PulsarServerException {
@@ -430,19 +431,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
new IllegalStateException("Invalid channel state:" + channelState.name()));
}
- return leaderElectionService.readCurrentLeader().thenApply(leader -> {
- //expecting http://broker-xyz:port
- // TODO: discard this protocol prefix removal
- // by a util func that returns lookupServiceAddress(serviceUrl)
- if (leader.isPresent()) {
- String broker = leader.get().getServiceUrl();
- broker = broker.substring(broker.lastIndexOf('/') + 1);
- return Optional.of(broker);
- } else {
- return Optional.empty();
- }
- }
- );
+ return leaderElectionService.readCurrentLeader()
+ .thenApply(leader -> leader.map(LeaderBroker::getBrokerId));
}
public CompletableFuture<Boolean> isChannelOwnerAsync() {
@@ -484,7 +474,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
}
public boolean isOwner(String serviceUnit) {
- return isOwner(serviceUnit, lookupServiceAddress);
+ return isOwner(serviceUnit, brokerId);
}
private CompletableFuture<Optional<String>> getActiveOwnerAsync(
@@ -672,7 +662,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
long totalHandledRequests = getHandlerTotalCounter(data).incrementAndGet();
if (debug()) {
log.info("{} received a handle request for serviceUnit:{}, data:{}. totalHandledRequests:{}",
- lookupServiceAddress, serviceUnit, data, totalHandledRequests);
+ brokerId, serviceUnit, data, totalHandledRequests);
}
ServiceUnitState state = state(data);
@@ -736,7 +726,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
long handlerFailureCount = getHandlerFailureCounter(data).get();
log.info("{} handled {} event for serviceUnit:{}, cur:{}, next:{}, "
+ "totalHandledRequests:{}, totalFailedRequests:{}",
- lookupServiceAddress, getLogEventTag(data), serviceUnit,
+ brokerId, getLogEventTag(data), serviceUnit,
data == null ? "" : data,
next == null ? "" : next,
handlerTotalCount, handlerFailureCount
@@ -747,7 +737,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
long handlerFailureCount = getHandlerFailureCounter(data).incrementAndGet();
log.error("{} failed to handle {} event for serviceUnit:{}, cur:{}, next:{}, "
+ "totalHandledRequests:{}, totalFailedRequests:{}",
- lookupServiceAddress, getLogEventTag(data), serviceUnit,
+ brokerId, getLogEventTag(data), serviceUnit,
data == null ? "" : data,
next == null ? "" : next,
handlerTotalCount, handlerFailureCount,
@@ -885,7 +875,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
if (broker == null) {
return false;
}
- return broker.equals(lookupServiceAddress);
+ return broker.equals(brokerId);
}
private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
@@ -1291,7 +1281,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS);
} catch (InterruptedException e) {
log.warn("Interrupted while delaying the next service unit clean-up. Cleaning broker:{}",
- lookupServiceAddress);
+ brokerId);
}
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java
index 468552db541..56238d6528e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java
@@ -42,14 +42,14 @@ public class IsolationPoliciesHelper {
return LoadManagerShared.applyNamespacePoliciesAsync(serviceUnit, policies,
availableBrokers.keySet(), new LoadManagerShared.BrokerTopicLoadingPredicate() {
@Override
- public boolean isEnablePersistentTopics(String brokerUrl) {
- BrokerLookupData lookupData = availableBrokers.get(brokerUrl.replace("http://", ""));
+ public boolean isEnablePersistentTopics(String brokerId) {
+ BrokerLookupData lookupData = availableBrokers.get(brokerId);
return lookupData != null && lookupData.persistentTopicsEnabled();
}
@Override
- public boolean isEnableNonPersistentTopics(String brokerUrl) {
- BrokerLookupData lookupData = availableBrokers.get(brokerUrl.replace("http://", ""));
+ public boolean isEnableNonPersistentTopics(String brokerId) {
+ BrokerLookupData lookupData = availableBrokers.get(brokerId);
return lookupData != null && lookupData.nonPersistentTopicsEnabled();
}
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java
index b07acfda7f7..3061969120b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java
@@ -55,7 +55,7 @@ public class BrokerLoadDataReporter implements LoadDataReporter<BrokerLoadData>,
private final BrokerHostUsage brokerHostUsage;
- private final String lookupServiceAddress;
+ private final String brokerId;
@Getter
private final BrokerLoadData localData;
@@ -67,10 +67,10 @@ public class BrokerLoadDataReporter implements LoadDataReporter<BrokerLoadData>,
private long tombstoneDelayInMillis;
public BrokerLoadDataReporter(PulsarService pulsar,
- String lookupServiceAddress,
+ String brokerId,
LoadDataStore<BrokerLoadData> brokerLoadDataStore) {
this.brokerLoadDataStore = brokerLoadDataStore;
- this.lookupServiceAddress = lookupServiceAddress;
+ this.brokerId = brokerId;
this.pulsar = pulsar;
this.conf = this.pulsar.getConfiguration();
if (SystemUtils.IS_OS_LINUX) {
@@ -111,7 +111,7 @@ public class BrokerLoadDataReporter implements LoadDataReporter<BrokerLoadData>,
log.info("publishing load report:{}", localData.toString(conf));
}
CompletableFuture<Void> future =
- this.brokerLoadDataStore.pushAsync(this.lookupServiceAddress, newLoadData);
+ this.brokerLoadDataStore.pushAsync(this.brokerId, newLoadData);
future.whenComplete((__, ex) -> {
if (ex == null) {
localData.setReportedAt(System.currentTimeMillis());
@@ -185,7 +185,7 @@ public class BrokerLoadDataReporter implements LoadDataReporter<BrokerLoadData>,
}
var lastSuccessfulTombstonedAt = lastTombstonedAt;
lastTombstonedAt = now; // dedup first
- brokerLoadDataStore.removeAsync(lookupServiceAddress)
+ brokerLoadDataStore.removeAsync(brokerId)
.whenComplete((__, e) -> {
if (e != null) {
log.error("Failed to clean broker load data.", e);
@@ -209,13 +209,13 @@ public class BrokerLoadDataReporter implements LoadDataReporter<BrokerLoadData>,
ServiceUnitState state = ServiceUnitStateData.state(data);
switch (state) {
case Releasing, Splitting -> {
- if (StringUtils.equals(data.sourceBroker(), lookupServiceAddress)) {
+ if (StringUtils.equals(data.sourceBroker(), brokerId)) {
localData.clear();
tombstone();
}
}
case Owned -> {
- if (StringUtils.equals(data.dstBroker(), lookupServiceAddress)) {
+ if (StringUtils.equals(data.dstBroker(), brokerId)) {
localData.clear();
tombstone();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java
index 0fa37d3687c..43e05ad1ac9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java
@@ -41,7 +41,7 @@ public class TopBundleLoadDataReporter implements LoadDataReporter<TopBundlesLoa
private final PulsarService pulsar;
- private final String lookupServiceAddress;
+ private final String brokerId;
private final LoadDataStore<TopBundlesLoadData> bundleLoadDataStore;
@@ -53,10 +53,10 @@ public class TopBundleLoadDataReporter implements LoadDataReporter<TopBundlesLoa
private long tombstoneDelayInMillis;
public TopBundleLoadDataReporter(PulsarService pulsar,
- String lookupServiceAddress,
+ String brokerId,
LoadDataStore<TopBundlesLoadData> bundleLoadDataStore) {
this.pulsar = pulsar;
- this.lookupServiceAddress = lookupServiceAddress;
+ this.brokerId = brokerId;
this.bundleLoadDataStore = bundleLoadDataStore;
this.lastBundleStatsUpdatedAt = 0;
this.topKBundles = new TopKBundles(pulsar);
@@ -88,7 +88,7 @@ public class TopBundleLoadDataReporter implements LoadDataReporter<TopBundlesLoa
if (ExtensibleLoadManagerImpl.debug(pulsar.getConfiguration(), log)) {
log.info("Reporting TopBundlesLoadData:{}", topKBundles.getLoadData());
}
- return this.bundleLoadDataStore.pushAsync(lookupServiceAddress, topKBundles.getLoadData())
+ return this.bundleLoadDataStore.pushAsync(brokerId, topKBundles.getLoadData())
.exceptionally(e -> {
log.error("Failed to report top-bundles load data.", e);
return null;
@@ -106,7 +106,7 @@ public class TopBundleLoadDataReporter implements LoadDataReporter<TopBundlesLoa
}
var lastSuccessfulTombstonedAt = lastTombstonedAt;
lastTombstonedAt = now; // dedup first
- bundleLoadDataStore.removeAsync(lookupServiceAddress)
+ bundleLoadDataStore.removeAsync(brokerId)
.whenComplete((__, e) -> {
if (e != null) {
log.error("Failed to clean broker load data.", e);
@@ -129,12 +129,12 @@ public class TopBundleLoadDataReporter implements LoadDataReporter<TopBundlesLoa
ServiceUnitState state = ServiceUnitStateData.state(data);
switch (state) {
case Releasing, Splitting -> {
- if (StringUtils.equals(data.sourceBroker(), lookupServiceAddress)) {
+ if (StringUtils.equals(data.sourceBroker(), brokerId)) {
tombstone();
}
}
case Owned -> {
- if (StringUtils.equals(data.dstBroker(), lookupServiceAddress)) {
+ if (StringUtils.equals(data.dstBroker(), brokerId)) {
tombstone();
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 5f2e4b1f25d..3d627db6cfa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -21,8 +21,6 @@ package org.apache.pulsar.broker.loadbalance.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
import io.netty.util.concurrent.FastThreadLocal;
-import java.net.MalformedURLException;
-import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -106,59 +104,56 @@ public class LoadManagerShared {
if (isIsolationPoliciesPresent) {
LOG.debug("Isolation Policies Present for namespace - [{}]", namespace.toString());
}
- for (final String broker : availableBrokers) {
- final String brokerUrlString = String.format("http://%s", broker);
- URL brokerUrl;
+ for (final String brokerId : availableBrokers) {
+ String brokerHost;
try {
- brokerUrl = new URL(brokerUrlString);
- } catch (MalformedURLException e) {
- LOG.error("Unable to parse brokerUrl from ResourceUnitId", e);
+ brokerHost = parseBrokerHost(brokerId);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Unable to parse host from {}", brokerId, e);
continue;
}
// todo: in future check if the resource unit has resources to take the namespace
if (isIsolationPoliciesPresent) {
// note: serviceUnitID is namespace name and ResourceID is brokerName
- if (policies.isPrimaryBroker(namespace, brokerUrl.getHost())) {
- primariesCache.add(broker);
+ if (policies.isPrimaryBroker(namespace, brokerHost)) {
+ primariesCache.add(brokerId);
if (LOG.isDebugEnabled()) {
LOG.debug("Added Primary Broker - [{}] as possible Candidates for"
- + " namespace - [{}] with policies", brokerUrl.getHost(), namespace.toString());
+ + " namespace - [{}] with policies", brokerHost, namespace.toString());
}
- } else if (policies.isSecondaryBroker(namespace, brokerUrl.getHost())) {
- secondaryCache.add(broker);
+ } else if (policies.isSecondaryBroker(namespace, brokerHost)) {
+ secondaryCache.add(brokerId);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Added Shared Broker - [{}] as possible "
+ "Candidates for namespace - [{}] with policies",
- brokerUrl.getHost(), namespace.toString());
+ brokerHost, namespace.toString());
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping Broker - [{}] not primary broker and not shared" + " for namespace - [{}] ",
- brokerUrl.getHost(), namespace.toString());
+ brokerHost, namespace.toString());
}
}
} else {
// non-persistent topic can be assigned to only those brokers that enabled for non-persistent topic
- if (isNonPersistentTopic
- && !brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
+ if (isNonPersistentTopic && !brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Filter broker- [{}] because it doesn't support non-persistent namespace - [{}]",
- brokerUrl.getHost(), namespace.toString());
+ brokerHost, namespace.toString());
}
- } else if (!isNonPersistentTopic
- && !brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
+ } else if (!isNonPersistentTopic && !brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerId)) {
// persistent topic can be assigned to only brokers that enabled for persistent-topic
if (LOG.isDebugEnabled()) {
LOG.debug("Filter broker- [{}] because broker only supports non-persistent namespace - [{}]",
- brokerUrl.getHost(), namespace.toString());
+ brokerHost, namespace.toString());
}
- } else if (policies.isSharedBroker(brokerUrl.getHost())) {
- secondaryCache.add(broker);
+ } else if (policies.isSharedBroker(brokerHost)) {
+ secondaryCache.add(brokerId);
if (LOG.isDebugEnabled()) {
LOG.debug("Added Shared Broker - [{}] as possible Candidates for namespace - [{}]",
- brokerUrl.getHost(), namespace.toString());
+ brokerHost, namespace.toString());
}
}
}
@@ -181,6 +176,16 @@ public class LoadManagerShared {
}
}
+ private static String parseBrokerHost(String brokerId) {
+ // use last index to support ipv6 addresses
+ int lastIdx = brokerId.lastIndexOf(':');
+ if (lastIdx > -1) {
+ return brokerId.substring(0, lastIdx);
+ } else {
+ throw new IllegalArgumentException("Invalid brokerId: " + brokerId);
+ }
+ }
+
public static CompletableFuture<Set<String>> applyNamespacePoliciesAsync(
final ServiceUnitId serviceUnit, final SimpleResourceAllocationPolicies policies,
final Set<String> availableBrokers, final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
@@ -199,59 +204,57 @@ public class LoadManagerShared {
LOG.debug("Isolation Policies Present for namespace - [{}]", namespace.toString());
}
}
- for (final String broker : availableBrokers) {
- final String brokerUrlString = String.format("http://%s", broker);
- URL brokerUrl;
+ for (final String brokerId : availableBrokers) {
+ String brokerHost;
try {
- brokerUrl = new URL(brokerUrlString);
- } catch (MalformedURLException e) {
- LOG.error("Unable to parse brokerUrl from ResourceUnitId", e);
+ brokerHost = parseBrokerHost(brokerId);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Unable to parse host from {}", brokerId, e);
continue;
}
// todo: in future check if the resource unit has resources to take the namespace
if (isIsolationPoliciesPresent) {
// note: serviceUnitID is namespace name and ResourceID is brokerName
- if (policies.isPrimaryBroker(namespace, brokerUrl.getHost())) {
- primariesCache.add(broker);
+ if (policies.isPrimaryBroker(namespace, brokerHost)) {
+ primariesCache.add(brokerId);
if (LOG.isDebugEnabled()) {
LOG.debug("Added Primary Broker - [{}] as possible Candidates for"
- + " namespace - [{}] with policies", brokerUrl.getHost(), namespace.toString());
+ + " namespace - [{}] with policies", brokerHost, namespace.toString());
}
- } else if (policies.isSecondaryBroker(namespace, brokerUrl.getHost())) {
- secondaryCache.add(broker);
+ } else if (policies.isSecondaryBroker(namespace, brokerHost)) {
+ secondaryCache.add(brokerId);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Added Shared Broker - [{}] as possible "
+ "Candidates for namespace - [{}] with policies",
- brokerUrl.getHost(), namespace.toString());
+ brokerHost, namespace.toString());
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping Broker - [{}] not primary broker and not shared"
- + " for namespace - [{}] ", brokerUrl.getHost(), namespace.toString());
+ + " for namespace - [{}] ", brokerHost, namespace.toString());
}
}
} else {
// non-persistent topic can be assigned to only those brokers that enabled for non-persistent topic
- if (isNonPersistentTopic
- && !brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
+ if (isNonPersistentTopic && !brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Filter broker- [{}] because it doesn't support non-persistent namespace - [{}]",
- brokerUrl.getHost(), namespace.toString());
+ brokerId, namespace.toString());
}
- } else if (!isNonPersistentTopic
- && !brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
+ } else if (!isNonPersistentTopic && !brokerTopicLoadingPredicate
+ .isEnablePersistentTopics(brokerId)) {
// persistent topic can be assigned to only brokers that enabled for persistent-topic
if (LOG.isDebugEnabled()) {
LOG.debug("Filter broker- [{}] because broker only supports non-persistent "
- + "namespace - [{}]", brokerUrl.getHost(), namespace.toString());
+ + "namespace - [{}]", brokerId, namespace.toString());
}
- } else if (policies.isSharedBroker(brokerUrl.getHost())) {
- secondaryCache.add(broker);
+ } else if (policies.isSharedBroker(brokerHost)) {
+ secondaryCache.add(brokerId);
if (LOG.isDebugEnabled()) {
LOG.debug("Added Shared Broker - [{}] as possible Candidates for namespace - [{}]",
- brokerUrl.getHost(), namespace.toString());
+ brokerHost, namespace.toString());
}
}
}
@@ -762,9 +765,9 @@ public class LoadManagerShared {
}
public interface BrokerTopicLoadingPredicate {
- boolean isEnablePersistentTopics(String brokerUrl);
+ boolean isEnablePersistentTopics(String brokerId);
- boolean isEnableNonPersistentTopics(String brokerUrl);
+ boolean isEnableNonPersistentTopics(String brokerId);
}
/**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 4ecdfefbdd0..974d75d60b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -208,15 +208,15 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
this.bundleBrokerAffinityMap = new ConcurrentHashMap<>();
this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
- public boolean isEnablePersistentTopics(String brokerUrl) {
- final BrokerData brokerData = loadData.getBrokerData().get(brokerUrl.replace("http://", ""));
+ public boolean isEnablePersistentTopics(String brokerId) {
+ final BrokerData brokerData = loadData.getBrokerData().get(brokerId);
return brokerData != null && brokerData.getLocalData() != null
&& brokerData.getLocalData().isPersistentTopicsEnabled();
}
@Override
- public boolean isEnableNonPersistentTopics(String brokerUrl) {
- final BrokerData brokerData = loadData.getBrokerData().get(brokerUrl.replace("http://", ""));
+ public boolean isEnableNonPersistentTopics(String brokerId) {
+ final BrokerData brokerData = loadData.getBrokerData().get(brokerId);
return brokerData != null && brokerData.getLocalData() != null
&& brokerData.getLocalData().isNonPersistentTopicsEnabled();
}
@@ -977,14 +977,14 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
localData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
localData.setLoadManagerClassName(conf.getLoadManagerClassName());
- String lookupServiceAddress = pulsar.getLookupServiceAddress();
- brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
+ String brokerId = pulsar.getBrokerId();
+ brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + brokerId;
updateLocalBrokerData();
brokerDataLock = brokersData.acquireLock(brokerZnodePath, localData).join();
pulsarResources.getLoadBalanceResources()
.getBrokerTimeAverageDataResources()
- .updateTimeAverageBrokerData(lookupServiceAddress, new TimeAverageBrokerData())
+ .updateTimeAverageBrokerData(brokerId, new TimeAverageBrokerData())
.join();
updateAll();
} catch (Exception e) {
@@ -1212,7 +1212,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
if (StringUtils.isBlank(broker)) {
return this.bundleBrokerAffinityMap.remove(bundle);
}
- broker = broker.replaceFirst("http[s]?://", "");
return this.bundleBrokerAffinityMap.put(bundle, broker);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index 63bc7ab07fe..c8d81bda1bc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.loadbalance.impl;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -32,7 +31,6 @@ import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
-import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
/**
* Wrapper class allowing classes of instance ModularLoadManager to be compatible with the interface LoadManager.
@@ -75,20 +73,6 @@ public class ModularLoadManagerWrapper implements LoadManager {
return leastLoadedBroker.map(this::buildBrokerResourceUnit);
}
- private String getBrokerWebServiceUrl(String broker) {
- LocalBrokerData localData = (loadManager).getBrokerLocalData(broker);
- if (localData != null) {
- return localData.getWebServiceUrlTls() != null ? localData.getWebServiceUrlTls()
- : localData.getWebServiceUrl();
- }
- return String.format("http://%s", broker);
- }
-
- private String getBrokerZnodeName(String broker, String webServiceUrl) {
- String scheme = webServiceUrl.substring(0, webServiceUrl.indexOf("://"));
- return String.format("%s://%s", scheme, broker);
- }
-
@Override
public List<Metrics> getLoadBalancingMetrics() {
return loadManager.getLoadBalancingMetrics();
@@ -149,10 +133,7 @@ public class ModularLoadManagerWrapper implements LoadManager {
}
private SimpleResourceUnit buildBrokerResourceUnit (String broker) {
- String webServiceUrl = getBrokerWebServiceUrl(broker);
- String brokerZnodeName = getBrokerZnodeName(broker, webServiceUrl);
- return new SimpleResourceUnit(webServiceUrl,
- new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName));
+ return new SimpleResourceUnit(broker, new PulsarResourceDescription());
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index ee60595a485..be0580808ca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -211,15 +211,15 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
.build();
this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
- public boolean isEnablePersistentTopics(String brokerUrl) {
- ResourceUnit ru = new SimpleResourceUnit(brokerUrl, new PulsarResourceDescription());
+ public boolean isEnablePersistentTopics(String brokerId) {
+ ResourceUnit ru = new SimpleResourceUnit(brokerId, new PulsarResourceDescription());
LoadReport loadReport = currentLoadReports.get(ru);
return loadReport != null && loadReport.isPersistentTopicsEnabled();
}
@Override
- public boolean isEnableNonPersistentTopics(String brokerUrl) {
- ResourceUnit ru = new SimpleResourceUnit(brokerUrl, new PulsarResourceDescription());
+ public boolean isEnableNonPersistentTopics(String brokerId) {
+ ResourceUnit ru = new SimpleResourceUnit(brokerId, new PulsarResourceDescription());
LoadReport loadReport = currentLoadReports.get(ru);
return loadReport != null && loadReport.isNonPersistentTopicsEnabled();
}
@@ -266,8 +266,8 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
@Override
public void start() throws PulsarServerException {
// Register the brokers in metadata store
- String lookupServiceAddress = pulsar.getLookupServiceAddress();
- String brokerLockPath = LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
+ String brokerId = pulsar.getBrokerId();
+ String brokerLockPath = LOADBALANCE_BROKERS_ROOT + "/" + brokerId;
try {
LoadReport loadReport = null;
@@ -653,7 +653,6 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
*/
private synchronized void doLoadRanking() {
ResourceUnitRanking.setCpuUsageByMsgRate(this.realtimeCpuLoadFactor);
- String hostname = pulsar.getAdvertisedAddress();
String strategy = this.getLoadBalancerPlacementStrategy();
log.info("doLoadRanking - load balancing strategy: {}", strategy);
if (!currentLoadReports.isEmpty()) {
@@ -702,8 +701,8 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
}
// update metrics
- if (resourceUnit.getResourceId().contains(hostname)) {
- updateLoadBalancingMetrics(hostname, finalRank, ranking);
+ if (resourceUnit.getResourceId().equals(pulsar.getBrokerId())) {
+ updateLoadBalancingMetrics(pulsar.getAdvertisedAddress(), finalRank, ranking);
}
}
updateBrokerToNamespaceToBundle();
@@ -711,7 +710,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
this.resourceUnitRankings = newResourceUnitRankings;
} else {
log.info("Leader broker[{}] No ResourceUnits to rank this run, Using Old Ranking",
- pulsar.getSafeWebServiceAddress());
+ pulsar.getBrokerId());
}
}
@@ -855,7 +854,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
// Add preallocated bundle range so incoming bundles from the same namespace are not assigned to the
// same broker.
brokerToNamespaceToBundleRange
- .computeIfAbsent(selectedRU.getResourceId().replace("http://", ""),
+ .computeIfAbsent(selectedRU.getResourceId(),
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build())
@@ -876,7 +875,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
availableBrokersCache.clear();
for (final Set<ResourceUnit> resourceUnits : availableBrokers.values()) {
for (final ResourceUnit resourceUnit : resourceUnits) {
- availableBrokersCache.add(resourceUnit.getResourceId().replace("http://", ""));
+ availableBrokersCache.add(resourceUnit.getResourceId());
}
}
brokerCandidateCache.clear();
@@ -899,7 +898,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
final Long rank = entry.getKey();
final Set<ResourceUnit> resourceUnits = entry.getValue();
for (final ResourceUnit resourceUnit : resourceUnits) {
- if (brokerCandidateCache.contains(resourceUnit.getResourceId().replace("http://", ""))) {
+ if (brokerCandidateCache.contains(resourceUnit.getResourceId())) {
result.put(rank, resourceUnit);
}
}
@@ -928,8 +927,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
availableBrokers = new HashMap<>();
for (String broker : activeBrokers) {
- ResourceUnit resourceUnit = new SimpleResourceUnit(String.format("http://%s", broker),
- new PulsarResourceDescription());
+ ResourceUnit resourceUnit = new SimpleResourceUnit(broker, new PulsarResourceDescription());
availableBrokers.computeIfAbsent(0L, key -> new TreeSet<>()).add(resourceUnit);
}
log.info("Choosing at random from broker list: [{}]", availableBrokers.values());
@@ -956,7 +954,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
Iterator<Map.Entry<Long, ResourceUnit>> candidateIterator = finalCandidates.entries().iterator();
while (candidateIterator.hasNext()) {
Map.Entry<Long, ResourceUnit> candidate = candidateIterator.next();
- String candidateBrokerName = candidate.getValue().getResourceId().replace("http://", "");
+ String candidateBrokerName = candidate.getValue().getResourceId();
if (!activeBrokers.contains(candidateBrokerName)) {
candidateIterator.remove(); // Current candidate points to an inactive broker, so remove it
}
@@ -1005,8 +1003,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
try {
String key = String.format("%s/%s", LOADBALANCE_BROKERS_ROOT, broker);
LoadReport lr = loadReports.readLock(key).join().get();
- ResourceUnit ru = new SimpleResourceUnit(String.format("http://%s", lr.getName()),
- fromLoadReport(lr));
+ ResourceUnit ru = new SimpleResourceUnit(lr.getName(), fromLoadReport(lr));
this.currentLoadReports.put(ru, lr);
} catch (Exception e) {
log.warn("Error reading load report from Cache for broker - [{}], [{}]", broker, e);
@@ -1078,7 +1075,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
loadReport.setProtocols(pulsar.getProtocolDataToAdvertise());
loadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
loadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
- loadReport.setName(pulsar.getLookupServiceAddress());
+ loadReport.setName(pulsar.getBrokerId());
loadReport.setBrokerVersionString(pulsar.getBrokerVersion());
SystemResourceUsage systemResourceUsage = this.getSystemResourceUsage();
@@ -1121,8 +1118,8 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
loadReport.setAllocatedMsgRateIn(allocatedQuota.getMsgRateIn());
loadReport.setAllocatedMsgRateOut(allocatedQuota.getMsgRateOut());
- final ResourceUnit resourceUnit = new SimpleResourceUnit(
- String.format("http://%s", loadReport.getName()), fromLoadReport(loadReport));
+ final ResourceUnit resourceUnit =
+ new SimpleResourceUnit(loadReport.getName(), fromLoadReport(loadReport));
Set<String> preAllocatedBundles;
if (resourceUnitRankings.containsKey(resourceUnit)) {
preAllocatedBundles = resourceUnitRankings.get(resourceUnit).getPreAllocatedBundles();
@@ -1277,7 +1274,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
final Set<String> preallocatedBundles = resourceUnitRankings.get(resourceUnit).getPreAllocatedBundles();
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
- .computeIfAbsent(broker.replace("http://", ""),
+ .computeIfAbsent(broker,
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
@@ -1455,7 +1452,6 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
if (StringUtils.isBlank(broker)) {
return this.bundleBrokerAffinityMap.remove(bundle);
}
- broker = broker.replaceFirst("http[s]?://", "");
return this.bundleBrokerAffinityMap.put(bundle, broker);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 4a54d4e0908..d8c3fd169a2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -192,7 +192,7 @@ public class NamespaceService implements AutoCloseable {
return findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {
if (optResult.isPresent()) {
LOG.info("[{}] Redirect lookup request to {} for topic {}",
- pulsar.getSafeWebServiceAddress(), optResult.get(), topic);
+ pulsar.getBrokerId(), optResult.get(), topic);
return CompletableFuture.completedFuture(optResult);
}
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
@@ -298,7 +298,7 @@ public class NamespaceService implements AutoCloseable {
return findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {
if (optResult.isPresent()) {
LOG.info("[{}] Redirect lookup request to {} for topic {}",
- pulsar.getSafeWebServiceAddress(), optResult.get(), topic);
+ pulsar.getBrokerId(), optResult.get(), topic);
try {
LookupData lookupData = optResult.get().getLookupData();
final String redirectUrl = options.isRequestHttps()
@@ -338,17 +338,17 @@ public class NamespaceService implements AutoCloseable {
* @throws PulsarServerException if an unexpected error occurs
*/
public void registerBootstrapNamespaces() throws PulsarServerException {
- String lookupServiceAddress = pulsar.getLookupServiceAddress();
+ String brokerId = pulsar.getBrokerId();
// ensure that we own the heartbeat namespace
- if (registerNamespace(getHeartbeatNamespace(lookupServiceAddress, config), true)) {
+ if (registerNamespace(getHeartbeatNamespace(brokerId, config), true)) {
LOG.info("added heartbeat namespace name in local cache: ns={}",
- getHeartbeatNamespace(lookupServiceAddress, config));
+ getHeartbeatNamespace(brokerId, config));
}
// ensure that we own the heartbeat namespace
- if (registerNamespace(getHeartbeatNamespaceV2(lookupServiceAddress, config), true)) {
+ if (registerNamespace(getHeartbeatNamespaceV2(brokerId, config), true)) {
LOG.info("added heartbeat namespace name in local cache: ns={}",
- getHeartbeatNamespaceV2(lookupServiceAddress, config));
+ getHeartbeatNamespaceV2(brokerId, config));
}
// we may not need strict ownership checking for bootstrap names for now
@@ -506,7 +506,6 @@ public class NamespaceService implements AutoCloseable {
return;
}
String candidateBroker;
- String candidateBrokerAdvertisedAddr = null;
LeaderElectionService les = pulsar.getLeaderElectionService();
if (les == null) {
@@ -541,14 +540,14 @@ public class NamespaceService implements AutoCloseable {
if (options.isAuthoritative()) {
// leader broker already assigned the current broker as owner
- candidateBroker = pulsar.getSafeWebServiceAddress();
+ candidateBroker = pulsar.getBrokerId();
} else {
LoadManager loadManager = this.loadManager.get();
boolean makeLoadManagerDecisionOnThisBroker = !loadManager.isCentralized() || les.isLeader();
if (!makeLoadManagerDecisionOnThisBroker) {
// If leader is not active, fallback to pick the least loaded from current broker loadmanager
boolean leaderBrokerActive = currentLeader.isPresent()
- && isBrokerActive(currentLeader.get().getServiceUrl());
+ && isBrokerActive(currentLeader.get().getBrokerId());
if (!leaderBrokerActive) {
makeLoadManagerDecisionOnThisBroker = true;
if (currentLeader.isEmpty()) {
@@ -567,7 +566,7 @@ public class NamespaceService implements AutoCloseable {
}
}
if (makeLoadManagerDecisionOnThisBroker) {
- Optional<Pair<String, String>> availableBroker = getLeastLoadedFromLoadManager(bundle);
+ Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
if (availableBroker.isEmpty()) {
LOG.warn("Load manager didn't return any available broker. "
+ "Returning empty result to lookup. NamespaceBundle[{}]",
@@ -575,12 +574,11 @@ public class NamespaceService implements AutoCloseable {
lookupFuture.complete(Optional.empty());
return;
}
- candidateBroker = availableBroker.get().getLeft();
- candidateBrokerAdvertisedAddr = availableBroker.get().getRight();
+ candidateBroker = availableBroker.get();
authoritativeRedirect = true;
} else {
// forward to leader broker to make assignment
- candidateBroker = currentLeader.get().getServiceUrl();
+ candidateBroker = currentLeader.get().getBrokerId();
}
}
}
@@ -593,7 +591,7 @@ public class NamespaceService implements AutoCloseable {
try {
Objects.requireNonNull(candidateBroker);
- if (candidateBroker.equals(pulsar.getSafeWebServiceAddress())) {
+ if (candidateBroker.equals(pulsar.getBrokerId())) {
// Load manager decided that the local broker should try to become the owner
ownershipCache.tryAcquiringOwnership(bundle).thenAccept(ownerInfo -> {
if (ownerInfo.isDisabled()) {
@@ -644,8 +642,7 @@ public class NamespaceService implements AutoCloseable {
}
// Now setting the redirect url
- createLookupResult(candidateBrokerAdvertisedAddr == null ? candidateBroker
- : candidateBrokerAdvertisedAddr, authoritativeRedirect, options.getAdvertisedListenerName())
+ createLookupResult(candidateBroker, authoritativeRedirect, options.getAdvertisedListenerName())
.thenAccept(lookupResult -> lookupFuture.complete(Optional.of(lookupResult)))
.exceptionally(ex -> {
lookupFuture.completeExceptionally(ex);
@@ -665,7 +662,7 @@ public class NamespaceService implements AutoCloseable {
CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
try {
checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null %s", candidateBroker);
- String path = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + parseHostAndPort(candidateBroker);
+ String path = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + candidateBroker;
localBrokerDataCache.get(path).thenAccept(reportData -> {
if (reportData.isPresent()) {
@@ -702,29 +699,19 @@ public class NamespaceService implements AutoCloseable {
}
public boolean isBrokerActive(String candidateBroker) {
- String candidateBrokerHostAndPort = parseHostAndPort(candidateBroker);
Set<String> availableBrokers = getAvailableBrokers();
- if (availableBrokers.contains(candidateBrokerHostAndPort)) {
+ if (availableBrokers.contains(candidateBroker)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Broker {} ({}) is available for.", candidateBroker, candidateBrokerHostAndPort);
+ LOG.debug("Broker {} is available for.", candidateBroker);
}
return true;
} else {
- LOG.warn("Broker {} ({}) couldn't be found in available brokers {}",
- candidateBroker, candidateBrokerHostAndPort,
- String.join(",", availableBrokers));
+ LOG.warn("Broker {} couldn't be found in available brokers {}",
+ candidateBroker, String.join(",", availableBrokers));
return false;
}
}
- private static String parseHostAndPort(String candidateBroker) {
- int uriSeparatorPos = candidateBroker.indexOf("://");
- if (uriSeparatorPos == -1) {
- throw new IllegalArgumentException("'" + candidateBroker + "' isn't an URI.");
- }
- return candidateBroker.substring(uriSeparatorPos + 3);
- }
-
private Set<String> getAvailableBrokers() {
try {
return loadManager.get().getAvailableBrokers();
@@ -740,7 +727,7 @@ public class NamespaceService implements AutoCloseable {
* @return the least loaded broker addresses
* @throws Exception if an error occurs
*/
- private Optional<Pair<String, String>> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
+ private Optional<String> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
Optional<ResourceUnit> leastLoadedBroker = loadManager.get().getLeastLoaded(serviceUnit);
if (leastLoadedBroker.isEmpty()) {
LOG.warn("No broker is available for {}", serviceUnit);
@@ -748,15 +735,13 @@ public class NamespaceService implements AutoCloseable {
}
String lookupAddress = leastLoadedBroker.get().getResourceId();
- String advertisedAddr = (String) leastLoadedBroker.get()
- .getProperty(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME);
if (LOG.isDebugEnabled()) {
LOG.debug("{} : redirecting to the least loaded broker, lookup address={}",
- pulsar.getSafeWebServiceAddress(),
+ pulsar.getBrokerId(),
lookupAddress);
}
- return Optional.of(Pair.of(lookupAddress, advertisedAddr));
+ return Optional.of(lookupAddress);
}
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle) {
@@ -1564,7 +1549,7 @@ public class NamespaceService implements AutoCloseable {
}
public void unloadSLANamespace() throws Exception {
- NamespaceName namespaceName = getSLAMonitorNamespace(pulsar.getLookupServiceAddress(), config);
+ NamespaceName namespaceName = getSLAMonitorNamespace(pulsar.getBrokerId(), config);
LOG.info("Checking owner for SLA namespace {}", namespaceName);
@@ -1597,7 +1582,7 @@ public class NamespaceService implements AutoCloseable {
Matcher m = HEARTBEAT_NAMESPACE_PATTERN.matcher(ns.getNamespaceObject().toString());
if (m.matches()) {
LOG.debug("Heartbeat namespace matched the lookup namespace {}", ns.getNamespaceObject().toString());
- return String.format("http://%s", m.group(1));
+ return m.group(1);
} else {
return null;
}
@@ -1607,7 +1592,7 @@ public class NamespaceService implements AutoCloseable {
Matcher m = HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(ns.getNamespaceObject().toString());
if (m.matches()) {
LOG.debug("Heartbeat namespace v2 matched the lookup namespace {}", ns.getNamespaceObject().toString());
- return String.format("http://%s", m.group(1));
+ return m.group(1);
} else {
return null;
}
@@ -1616,7 +1601,7 @@ public class NamespaceService implements AutoCloseable {
public static String getSLAMonitorBrokerName(ServiceUnitId ns) {
Matcher m = SLA_NAMESPACE_PATTERN.matcher(ns.getNamespaceObject().toString());
if (m.matches()) {
- return String.format("http://%s", m.group(1));
+ return m.group(1);
} else {
return null;
}
@@ -1647,16 +1632,16 @@ public class NamespaceService implements AutoCloseable {
}
public boolean registerSLANamespace() throws PulsarServerException {
- String lookupServiceAddress = pulsar.getLookupServiceAddress();
- boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(lookupServiceAddress, config), false);
+ String brokerId = pulsar.getBrokerId();
+ boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(brokerId, config), false);
if (isNameSpaceRegistered) {
if (LOG.isDebugEnabled()) {
LOG.debug("Added SLA Monitoring namespace name in local cache: ns={}",
- getSLAMonitorNamespace(lookupServiceAddress, config));
+ getSLAMonitorNamespace(brokerId, config));
}
} else if (LOG.isDebugEnabled()) {
LOG.debug("SLA Monitoring not owned by the broker: ns={}",
- getSLAMonitorNamespace(lookupServiceAddress, config));
+ getSLAMonitorNamespace(brokerId, config));
}
return isNameSpaceRegistered;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 8642815430a..62197900076 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2112,7 +2112,7 @@ public class BrokerService implements Closeable {
} else {
String msg = String.format("Namespace bundle for topic (%s) not served by this instance:%s. "
+ "Please redo the lookup. Request is denied: namespace=%s",
- topic, pulsar.getLookupServiceAddress(), topicName.getNamespace());
+ topic, pulsar.getBrokerId(), topicName.getNamespace());
log.warn(msg);
return FutureUtil.failedFuture(new ServiceUnitNotReadyException(msg));
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 00cf3a6583b..2fa85f262de 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -968,7 +968,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
});
stats.topicEpoch = topicEpoch.orElse(null);
- stats.ownerBroker = brokerService.pulsar().getLookupServiceAddress();
+ stats.ownerBroker = brokerService.pulsar().getBrokerId();
future.complete(stats);
return future;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6194d76b3b7..4e7a1392c83 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2409,7 +2409,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
stats.backlogSize = ledger.getEstimatedBacklogSize();
stats.deduplicationStatus = messageDeduplication.getStatus().toString();
stats.topicEpoch = topicEpoch.orElse(null);
- stats.ownerBroker = brokerService.pulsar().getLookupServiceAddress();
+ stats.ownerBroker = brokerService.pulsar().getBrokerId();
stats.offloadedStorageSize = ledger.getOffloadedSize();
stats.lastOffloadLedgerId = ledger.getLastOffloadedLedgerId();
stats.lastOffloadSuccessTimeStamp = ledger.getLastOffloadedSuccessTimestamp();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index e8192cde3fd..e23286ae449 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -56,7 +56,9 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationParameters;
import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.BookieResources;
@@ -93,6 +95,7 @@ import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.path.PolicyPath;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
@@ -1199,24 +1202,43 @@ public abstract class PulsarWebResource {
/**
* Redirect the call to the specified broker.
*
- * @param broker
- * Broker name
+ * @param brokerId broker's id (lookup service address)
*/
- protected void validateBrokerName(String broker) {
- String brokerUrl = String.format("http://%s", broker);
- String brokerUrlTls = String.format("https://%s", broker);
- if (!brokerUrl.equals(pulsar().getWebServiceAddress())
- && !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
- String[] parts = broker.split(":");
- checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker));
- String host = parts[0];
- int port = Integer.parseInt(parts[1]);
-
- URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(host).port(port).build();
- log.debug("[{}] Redirecting the rest call to {}: broker={}", clientAppId(), redirect, broker);
- throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
-
+ protected CompletableFuture<Void> maybeRedirectToBroker(String brokerId) {
+ // backwards compatibility
+ String cleanedBrokerId = brokerId.replaceFirst("http[s]?://", "");
+ if (pulsar.getBrokerId().equals(cleanedBrokerId)
+ // backwards compatibility
+ || ("http://" + cleanedBrokerId).equals(pulsar().getWebServiceAddress())
+ || ("https://" + cleanedBrokerId).equals(pulsar().getWebServiceAddressTls())) {
+ // no need to redirect, the current broker matches the given broker id
+ return CompletableFuture.completedFuture(null);
}
+ LockManager<BrokerLookupData> brokerLookupDataLockManager =
+ pulsar().getCoordinationService().getLockManager(BrokerLookupData.class);
+ return brokerLookupDataLockManager.readLock(LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + cleanedBrokerId)
+ .thenAccept(brokerLookupDataOptional -> {
+ if (brokerLookupDataOptional.isEmpty()) {
+ throw new RestException(Status.NOT_FOUND,
+ "Broker id '" + brokerId + "' not found in available brokers.");
+ }
+ brokerLookupDataOptional.ifPresent(brokerLookupData -> {
+ URI targetBrokerUri;
+ if ((isRequestHttps() || StringUtils.isBlank(brokerLookupData.getWebServiceUrl()))
+ && StringUtils.isNotBlank(brokerLookupData.getWebServiceUrlTls())) {
+ targetBrokerUri = URI.create(brokerLookupData.getWebServiceUrlTls());
+ } else {
+ targetBrokerUri = URI.create(brokerLookupData.getWebServiceUrl());
+ }
+ URI redirect = UriBuilder.fromUri(uri.getRequestUri())
+ .scheme(targetBrokerUri.getScheme())
+ .host(targetBrokerUri.getHost())
+ .port(targetBrokerUri.getPort()).build();
+ log.debug("[{}] Redirecting the rest call to {}: broker={}", clientAppId(), redirect,
+ cleanedBrokerId);
+ throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+ });
+ });
}
public void validateTopicPolicyOperation(TopicName topicName, PolicyName policy, PolicyOperation operation) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index 47949d7312b..4a6524bf245 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -102,9 +102,9 @@ public class SLAMonitoringTest {
createTenant(pulsarAdmins[BROKER_COUNT - 1]);
for (int i = 0; i < BROKER_COUNT; i++) {
- String topic = String.format("%s/%s/%s:%s", NamespaceService.SLA_NAMESPACE_PROPERTY, "my-cluster",
- pulsarServices[i].getAdvertisedAddress(), brokerWebServicePorts[i]);
- pulsarAdmins[0].namespaces().createNamespace(topic);
+ var namespaceName = NamespaceService.getSLAMonitorNamespace(pulsarServices[i].getBrokerId(),
+ pulsarServices[i].getConfig());
+ pulsarAdmins[0].namespaces().createNamespace(namespaceName.toString());
}
}
@@ -173,9 +173,9 @@ public class SLAMonitoringTest {
public void testOwnershipViaAdminAfterSetup() {
for (int i = 0; i < BROKER_COUNT; i++) {
try {
- String topic = String.format("persistent://%s/%s/%s:%s/%s",
- NamespaceService.SLA_NAMESPACE_PROPERTY, "my-cluster", pulsarServices[i].getAdvertisedAddress(),
- brokerWebServicePorts[i], "my-topic");
+ String topic = String.format("persistent://%s/%s/%s/%s",
+ NamespaceService.SLA_NAMESPACE_PROPERTY, "my-cluster",
+ pulsarServices[i].getBrokerId(), "my-topic");
assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic),
"pulsar://" + pulsarServices[i].getAdvertisedAddress() + ":" + brokerNativeBrokerPorts[i]);
} catch (Exception e) {
@@ -199,8 +199,8 @@ public class SLAMonitoringTest {
fail("Should be a able to close the broker index " + crashIndex + " Exception: " + e);
}
- String topic = String.format("persistent://%s/%s/%s:%s/%s", NamespaceService.SLA_NAMESPACE_PROPERTY,
- "my-cluster", pulsarServices[crashIndex].getAdvertisedAddress(), brokerWebServicePorts[crashIndex],
+ String topic = String.format("persistent://%s/%s/%s/%s", NamespaceService.SLA_NAMESPACE_PROPERTY,
+ "my-cluster", pulsarServices[crashIndex].getBrokerId(),
"my-topic");
log.info("Lookup for namespace {}", topic);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java
index de4cf9658b2..7c9154a27ff 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java
@@ -81,9 +81,8 @@ public class AdminApiMultiBrokersTest extends MultiBrokerBaseTest {
assertTrue(leaderBroker.isPresent());
log.info("Leader broker is {}", leaderBroker);
for (PulsarAdmin admin : getAllAdmins()) {
- String serviceUrl = admin.brokers().getLeaderBroker().getServiceUrl();
- log.info("Pulsar admin get leader broker is {}", serviceUrl);
- assertEquals(leaderBroker.get().getServiceUrl(), serviceUrl);
+ String brokerId = admin.brokers().getLeaderBroker().getBrokerId();
+ assertEquals(leaderBroker.get().getBrokerId(), brokerId);
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 0df37835670..93cf369f7dd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -333,10 +333,12 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
} catch (PulsarAdminException e) {
assertTrue(e instanceof PreconditionFailedException);
}
+
+ restartBroker();
}
@Test
- public void clusterNamespaceIsolationPolicies() throws PulsarAdminException {
+ public void clusterNamespaceIsolationPolicies() throws Exception {
try {
// create
String policyName1 = "policy-1";
@@ -512,6 +514,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
// Ok
}
+ restartBroker();
}
@Test
@@ -529,7 +532,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(list2.size(), 1);
BrokerInfo leaderBroker = admin.brokers().getLeaderBroker();
- Assert.assertEquals(leaderBroker.getServiceUrl(), pulsar.getLeaderElectionService().getCurrentLeader().map(LeaderBroker::getServiceUrl).get());
+ Assert.assertEquals(leaderBroker.getBrokerId(),
+ pulsar.getLeaderElectionService().getCurrentLeader().map(LeaderBroker::getBrokerId).get());
Map<String, NamespaceOwnershipStatus> nsMap = admin.brokers().getOwnedNamespaces("test", list.get(0));
// since sla-monitor ns is not created nsMap.size() == 1 (for HeartBeat Namespace)
@@ -537,7 +541,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
for (String ns : nsMap.keySet()) {
NamespaceOwnershipStatus nsStatus = nsMap.get(ns);
if (ns.equals(
- NamespaceService.getHeartbeatNamespace(pulsar.getLookupServiceAddress(), pulsar.getConfiguration())
+ NamespaceService.getHeartbeatNamespace(pulsar.getBrokerId(), pulsar.getConfiguration())
+ "/0x00000000_0xffffffff")) {
assertEquals(nsStatus.broker_assignment, BrokerAssignment.shared);
assertFalse(nsStatus.is_controlled);
@@ -703,6 +707,10 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
Awaitility.await().until(() -> pulsar.getConfiguration().getBrokerShutdownTimeoutMs() == newValue);
// verify value is updated
assertEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), newValue);
+ // reset config
+ pulsar.getConfiguration().setBrokerShutdownTimeoutMs(0L);
+ // restart broker
+ restartBroker();
}
/**
@@ -801,6 +809,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"),
Set.of("test", "usw"));
admin.tenants().updateTenant("prop-xyz", tenantInfo);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin.tenants().getTenantInfo("prop-xyz").getAllowedClusters(), Set.of("test", "usw")));
assertEquals(admin.namespaces().getPolicies("prop-xyz/ns1").bundles, PoliciesUtil.defaultBundle());
@@ -3191,6 +3201,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"),
Set.of("test", "usw"));
admin.tenants().updateTenant("prop-xyz", tenantInfo);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin.tenants().getTenantInfo("prop-xyz").getAllowedClusters(),
+ tenantInfo.getAllowedClusters()));
admin.namespaces().createNamespace("prop-xyz/getBundleNs", 100);
assertEquals(admin.namespaces().getPolicies("prop-xyz/getBundleNs").bundles.getNumBundles(), 100);
@@ -3384,6 +3397,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"),
Set.of("test", "usw"));
admin.tenants().updateTenant("prop-xyz", tenantInfo);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin.tenants().getTenantInfo("prop-xyz").getAllowedClusters(),
+ tenantInfo.getAllowedClusters()));
String ns = BrokerTestUtil.newUniqueName("prop-xyz/ns");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 3caeb591bc8..8a83682c1d2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -726,7 +726,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
Object leaderBrokerRes = asyncRequests(ctx -> brokers.getLeaderBroker(ctx));
assertTrue(leaderBrokerRes instanceof BrokerInfo);
BrokerInfo leaderBroker = (BrokerInfo)leaderBrokerRes;
- assertEquals(leaderBroker.getServiceUrl(), pulsar.getLeaderElectionService().getCurrentLeader().map(LeaderBroker::getServiceUrl).get());
+ assertEquals(leaderBroker.getBrokerId(),
+ pulsar.getLeaderElectionService().getCurrentLeader().map(LeaderBroker::getBrokerId).get());
}
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index d9f2d41a30b..34bc1fa9a6a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -452,7 +452,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
for (String ns : nsMap.keySet()) {
NamespaceOwnershipStatus nsStatus = nsMap.get(ns);
if (ns.equals(
- NamespaceService.getHeartbeatNamespace(pulsar.getLookupServiceAddress(), pulsar.getConfiguration())
+ NamespaceService.getHeartbeatNamespace(pulsar.getBrokerId(), pulsar.getConfiguration())
+ "/0x00000000_0xffffffff")) {
assertEquals(nsStatus.broker_assignment, BrokerAssignment.shared);
assertFalse(nsStatus.is_controlled);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersMultiBrokerLeaderElectionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersMultiBrokerLeaderElectionTest.java
new file mode 100644
index 00000000000..5adc78b2c52
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersMultiBrokerLeaderElectionTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance;
+
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class AdvertisedListenersMultiBrokerLeaderElectionTest extends MultiBrokerLeaderElectionTest {
+ @Override
+ protected PulsarTestContext.Builder createPulsarTestContextBuilder(ServiceConfiguration conf) {
+ conf.setWebServicePortTls(Optional.of(0));
+ return super.createPulsarTestContextBuilder(conf).preallocatePorts(true).configOverride(config -> {
+ // use advertised address that is different than the name used in the advertised listeners
+ config.setAdvertisedAddress("localhost");
+ config.setAdvertisedListeners(
+ "public_pulsar:pulsar://127.0.0.1:" + config.getBrokerServicePort().get()
+ + ",public_http:http://127.0.0.1:" + config.getWebServicePort().get()
+ + ",public_https:https://127.0.0.1:" + config.getWebServicePortTls().get());
+ });
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
index 008897136f8..ded4ee8e58d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
@@ -115,7 +115,8 @@ public class LeaderElectionServiceTest {
checkLookupException(tenant, namespace, client);
// broker, webService and leaderElectionService is started, and elect is done;
- leaderBrokerReference.set(new LeaderBroker(pulsar.getWebServiceAddress()));
+ leaderBrokerReference.set(
+ new LeaderBroker(pulsar.getBrokerId(), pulsar.getSafeWebServiceAddress()));
Producer<byte[]> producer = client.newProducer()
.topic("persistent://" + tenant + "/" + namespace + "/1p")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index 50afb71ea09..e4a66b1201c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -133,7 +133,7 @@ public class LoadBalancerTest {
brokerNativeBrokerPorts[i] = pulsarServices[i].getBrokerListenPort().get();
brokerUrls[i] = new URL("http://127.0.0.1" + ":" + brokerWebServicePorts[i]);
- lookupAddresses[i] = pulsarServices[i].getAdvertisedAddress() + ":" + pulsarServices[i].getListenPortHTTP().get();
+ lookupAddresses[i] = pulsarServices[i].getBrokerId();
pulsarAdmins[i] = PulsarAdmin.builder().serviceHttpUrl(brokerUrls[i].toString()).build();
}
@@ -401,7 +401,7 @@ public class LoadBalancerTest {
double expectedMaxVariation = 10.0;
for (int i = 0; i < BROKER_COUNT; i++) {
long actualValue = 0;
- String resourceId = "http://" + lookupAddresses[i];
+ String resourceId = lookupAddresses[i];
if (namespaceOwner.containsKey(resourceId)) {
actualValue = namespaceOwner.get(resourceId);
}
@@ -681,7 +681,7 @@ public class LoadBalancerTest {
}
}
// Make sure all brokers see the same leader
- log.info("Old leader is : {}", oldLeader.getServiceUrl());
+ log.info("Old leader is : {}", oldLeader.getBrokerId());
for (PulsarService pulsar : activePulsar) {
log.info("Current leader for {} is : {}", pulsar.getWebServiceAddress(), pulsar.getLeaderElectionService().getCurrentLeader());
assertEquals(pulsar.getLeaderElectionService().readCurrentLeader().join(), Optional.of(oldLeader));
@@ -691,7 +691,7 @@ public class LoadBalancerTest {
leaderPulsar.close();
loopUntilLeaderChangesForAllBroker(followerPulsar, oldLeader);
LeaderBroker newLeader = followerPulsar.get(0).getLeaderElectionService().readCurrentLeader().join().get();
- log.info("New leader is : {}", newLeader.getServiceUrl());
+ log.info("New leader is : {}", newLeader.getBrokerId());
Assert.assertNotEquals(newLeader, oldLeader);
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
index 5c840129dd8..a7eaffc147b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.loadbalance;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -36,14 +37,24 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.MultiBrokerTestZKBaseTest;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.admin.Lookup;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker")
public class MultiBrokerLeaderElectionTest extends MultiBrokerTestZKBaseTest {
+ public MultiBrokerLeaderElectionTest() {
+ super();
+ this.isTcpLookup = true;
+ }
+
@Override
protected int numberOfAdditionalBrokers() {
return 9;
@@ -88,39 +99,80 @@ public class MultiBrokerLeaderElectionTest extends MultiBrokerTestZKBaseTest {
});
}
- @Test
- public void shouldProvideConsistentAnswerToTopicLookups()
+ @Test(timeOut = 60000L)
+ public void shouldProvideConsistentAnswerToTopicLookupsUsingAdminApi()
throws PulsarAdminException, ExecutionException, InterruptedException {
- String topicNameBase = "persistent://public/default/lookuptest" + UUID.randomUUID() + "-";
+ String namespace = "public/ns" + UUID.randomUUID();
+ admin.namespaces().createNamespace(namespace, 256);
+ String topicNameBase = "persistent://" + namespace + "/lookuptest-";
List<String> topicNames = IntStream.range(0, 500).mapToObj(i -> topicNameBase + i)
.collect(Collectors.toList());
List<PulsarAdmin> allAdmins = getAllAdmins();
@Cleanup("shutdown")
ExecutorService executorService = Executors.newFixedThreadPool(allAdmins.size());
List<Future<List<String>>> resultFutures = new ArrayList<>();
- String leaderBrokerUrl = admin.brokers().getLeaderBroker().getServiceUrl();
- log.info("LEADER is {}", leaderBrokerUrl);
// use Phaser to increase the chances of a race condition by triggering all threads once
- // they are waiting just before the lookupTopic call
+ // they are waiting just before each lookupTopic call
final Phaser phaser = new Phaser(1);
for (PulsarAdmin brokerAdmin : allAdmins) {
- if (!leaderBrokerUrl.equals(brokerAdmin.getServiceUrl())) {
- phaser.register();
- log.info("Doing lookup to broker {}", brokerAdmin.getServiceUrl());
- resultFutures.add(executorService.submit(() -> {
- phaser.arriveAndAwaitAdvance();
- return topicNames.stream().map(topicName -> {
- try {
- return brokerAdmin.lookups().lookupTopic(topicName);
- } catch (PulsarAdminException e) {
- log.error("Error looking up topic {} in {}", topicName, brokerAdmin.getServiceUrl());
- throw new RuntimeException(e);
- }
- }).collect(Collectors.toList());
- }));
+ phaser.register();
+ Lookup lookups = brokerAdmin.lookups();
+ log.info("Doing lookup to broker {}", brokerAdmin.getServiceUrl());
+ resultFutures.add(executorService.submit(() -> topicNames.stream().map(topicName -> {
+ phaser.arriveAndAwaitAdvance();
+ try {
+ return lookups.lookupTopic(topicName);
+ } catch (PulsarAdminException e) {
+ log.error("Error looking up topic {} in {}", topicName, brokerAdmin.getServiceUrl());
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList())));
+ }
+ phaser.arriveAndDeregister();
+ List<String> firstResult = null;
+ for (Future<List<String>> resultFuture : resultFutures) {
+ List<String> result = resultFuture.get();
+ if (firstResult == null) {
+ firstResult = result;
+ } else {
+ assertEquals(result, firstResult, "The lookup results weren't consistent.");
}
}
- phaser.arriveAndAwaitAdvance();
+ }
+
+ @Test(timeOut = 60000L)
+ public void shouldProvideConsistentAnswerToTopicLookupsUsingClient()
+ throws PulsarAdminException, ExecutionException, InterruptedException {
+ String namespace = "public/ns" + UUID.randomUUID();
+ admin.namespaces().createNamespace(namespace, 256);
+ String topicNameBase = "persistent://" + namespace + "/lookuptest-";
+ List<String> topicNames = IntStream.range(0, 500).mapToObj(i -> topicNameBase + i)
+ .collect(Collectors.toList());
+ List<PulsarClient> allClients = getAllClients();
+ @Cleanup("shutdown")
+ ExecutorService executorService = Executors.newFixedThreadPool(allClients.size());
+ List<Future<List<String>>> resultFutures = new ArrayList<>();
+ // use Phaser to increase the chances of a race condition by triggering all threads once
+ // they are waiting just before each lookupTopic call
+ final Phaser phaser = new Phaser(1);
+ for (PulsarClient brokerClient : allClients) {
+ phaser.register();
+ String serviceUrl = ((PulsarClientImpl) brokerClient).getConfiguration().getServiceUrl();
+ LookupService lookupService = ((PulsarClientImpl) brokerClient).getLookup();
+ log.info("Doing lookup to broker {}", serviceUrl);
+ resultFutures.add(executorService.submit(() -> topicNames.stream().map(topicName -> {
+ phaser.arriveAndAwaitAdvance();
+ try {
+ InetSocketAddress logicalAddress =
+ lookupService.getBroker(TopicName.get(topicName)).get().getLogicalAddress();
+ return logicalAddress.getHostString() + ":" + logicalAddress.getPort();
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Error looking up topic {} in {}", topicName, serviceUrl);
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList())));
+ }
+ phaser.arriveAndDeregister();
List<String> firstResult = null;
for (Future<List<String>> resultFuture : resultFutures) {
List<String> result = resultFuture.get();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 43706129fbe..f6154e3ec8e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -24,7 +24,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -213,7 +213,7 @@ public class SimpleLoadManagerImplTest {
rd.put("bandwidthIn", new ResourceUsage(250 * 1024, 1024 * 1024));
rd.put("bandwidthOut", new ResourceUsage(550 * 1024, 1024 * 1024));
- ResourceUnit ru1 = new SimpleResourceUnit("http://prod2-broker7.messaging.usw.example.com:8080", rd);
+ ResourceUnit ru1 = new SimpleResourceUnit("prod2-broker7.messaging.usw.example.com:8080", rd);
Set<ResourceUnit> rus = new HashSet<>();
rus.add(ru1);
LoadRanker lr = new ResourceAvailabilityRanker();
@@ -249,15 +249,15 @@ public class SimpleLoadManagerImplTest {
rd.put("bandwidthIn", new ResourceUsage(250 * 1024, 1024 * 1024));
rd.put("bandwidthOut", new ResourceUsage(550 * 1024, 1024 * 1024));
- ResourceUnit ru1 = new SimpleResourceUnit(
- "http://" + pulsar1.getAdvertisedAddress() + ":" + pulsar1.getConfiguration().getWebServicePort().get(), rd);
+ ResourceUnit ru1 = new SimpleResourceUnit(pulsar1.getBrokerId(), rd);
Set<ResourceUnit> rus = new HashSet<>();
rus.add(ru1);
LoadRanker lr = new ResourceAvailabilityRanker();
// inject the load report and rankings
Map<ResourceUnit, org.apache.pulsar.policies.data.loadbalancer.LoadReport> loadReports = new HashMap<>();
- org.apache.pulsar.policies.data.loadbalancer.LoadReport loadReport = new org.apache.pulsar.policies.data.loadbalancer.LoadReport();
+ org.apache.pulsar.policies.data.loadbalancer.LoadReport loadReport =
+ new org.apache.pulsar.policies.data.loadbalancer.LoadReport();
loadReport.setSystemResourceUsage(new SystemResourceUsage());
loadReports.put(ru1, loadReport);
setObjectField(SimpleLoadManagerImpl.class, loadManager, "currentLoadReports", loadReports);
@@ -272,10 +272,9 @@ public class SimpleLoadManagerImplTest {
sortedRankingsInstance.get().put(lr.getRank(rd), rus);
setObjectField(SimpleLoadManagerImpl.class, loadManager, "sortedRankings", sortedRankingsInstance);
- final Optional<ResourceUnit> leastLoaded = loadManager.getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10"));
- // broker is not active so found should be null
- assertFalse(leastLoaded.isPresent());
-
+ ResourceUnit found = loadManager.getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10")).get();
+ // TODO: this test doesn't make sense. This was the original assertion.
+ assertNotEquals(found, null, "did not find a broker when expected one to be found");
}
@Test(enabled = false)
@@ -290,7 +289,7 @@ public class SimpleLoadManagerImplTest {
rd.put("bandwidthIn", new ResourceUsage(250 * 1024, 1024 * 1024));
rd.put("bandwidthOut", new ResourceUsage(550 * 1024, 1024 * 1024));
- ResourceUnit ru1 = new SimpleResourceUnit("http://prod2-broker7.messaging.usw.example.com:8080", rd);
+ ResourceUnit ru1 = new SimpleResourceUnit("prod2-broker7.messaging.usw.example.com:8080", rd);
Set<ResourceUnit> rus = new HashSet<>();
rus.add(ru1);
LoadRanker lr = new ResourceAvailabilityRanker();
@@ -360,8 +359,8 @@ public class SimpleLoadManagerImplTest {
rd.put("bandwidthIn", new ResourceUsage(250 * 1024, 1024 * 1024));
rd.put("bandwidthOut", new ResourceUsage(550 * 1024, 1024 * 1024));
- ResourceUnit ru1 = new SimpleResourceUnit("http://pulsar-broker1.com:8080", rd);
- ResourceUnit ru2 = new SimpleResourceUnit("http://pulsar-broker2.com:8080", rd);
+ ResourceUnit ru1 = new SimpleResourceUnit("pulsar-broker1.com:8080", rd);
+ ResourceUnit ru2 = new SimpleResourceUnit("pulsar-broker2.com:8080", rd);
Set<ResourceUnit> rus = new HashSet<>();
rus.add(ru1);
rus.add(ru2);
@@ -414,22 +413,18 @@ public class SimpleLoadManagerImplTest {
final SimpleLoadManagerImpl loadManager = (SimpleLoadManagerImpl) pulsar1.getLoadManager().get();
for (final NamespaceBundle bundle : bundles) {
- if (loadManager.getLeastLoaded(bundle).get().getResourceId().equals(getAddress(primaryTlsHost))) {
+ if (loadManager.getLeastLoaded(bundle).get().getResourceId().equals(pulsar1.getBrokerId())) {
++numAssignedToPrimary;
} else {
++numAssignedToSecondary;
}
// Check that number of assigned bundles are equivalent when an even number have been assigned.
if ((numAssignedToPrimary + numAssignedToSecondary) % 2 == 0) {
- assert (numAssignedToPrimary == numAssignedToSecondary);
+ assertEquals(numAssignedToPrimary, numAssignedToSecondary);
}
}
}
- private static String getAddress(String url) {
- return url.replaceAll("https", "http");
- }
-
@Test
public void testNamespaceBundleStats() {
NamespaceBundleStats nsb1 = new NamespaceBundleStats();
@@ -519,7 +514,8 @@ public class SimpleLoadManagerImplTest {
}
private void setupClusters() throws PulsarAdminException {
- admin1.clusters().createCluster("use", ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress()).build());
+ admin1.clusters().createCluster("use", ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress())
+ .brokerServiceUrl(pulsar1.getBrokerServiceUrl()).build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("use"));
defaultTenant = "prop-xyz";
admin1.tenants().createTenant(defaultTenant, tenantInfo);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
index fca41837b9d..fdd1eb7272c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -75,7 +75,7 @@ public class BrokerRegistryTest {
private LocalBookkeeperEnsemble bkEnsemble;
- // Make sure the load manager don't register itself to `/loadbalance/brokers/{lookupServiceAddress}`
+ // Make sure the load manager don't register itself to `/loadbalance/brokers/{brokerId}`.
public static class MockLoadManager implements LoadManager {
@Override
@@ -291,7 +291,7 @@ public class BrokerRegistryTest {
pulsar1.start();
pulsar2.start();
- doReturn(pulsar1.getLookupServiceAddress()).when(pulsar2).getLookupServiceAddress();
+ doReturn(pulsar1.getBrokerId()).when(pulsar2).getBrokerId();
BrokerRegistryImpl brokerRegistry1 = createBrokerRegistryImpl(pulsar1);
BrokerRegistryImpl brokerRegistry2 = createBrokerRegistryImpl(pulsar2);
brokerRegistry1.start();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index bb7416ddc41..af150b44df8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -314,7 +314,7 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String, BrokerLookupData> brokers,
ServiceUnitId serviceUnit,
LoadManagerContext context) {
- brokers.remove(pulsar1.getLookupServiceAddress());
+ brokers.remove(pulsar1.getBrokerId());
return CompletableFuture.completedFuture(brokers);
}
@@ -399,10 +399,10 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
});
- String dstBrokerUrl = pulsar1.getLookupServiceAddress();
+ String dstBrokerUrl = pulsar1.getBrokerId();
String dstBrokerServiceUrl;
if (broker.equals(pulsar1.getBrokerServiceUrl())) {
- dstBrokerUrl = pulsar2.getLookupServiceAddress();
+ dstBrokerUrl = pulsar2.getBrokerId();
dstBrokerServiceUrl = pulsar2.getBrokerServiceUrl();
} else {
dstBrokerServiceUrl = pulsar1.getBrokerServiceUrl();
@@ -482,10 +482,10 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
final String dstBrokerUrl;
final String dstBrokerServiceUrl;
if (broker.equals(pulsar1.getBrokerServiceUrl())) {
- dstBrokerUrl = pulsar2.getLookupServiceAddress();
+ dstBrokerUrl = pulsar2.getBrokerId();
dstBrokerServiceUrl = pulsar2.getBrokerServiceUrl();
} else {
- dstBrokerUrl = pulsar1.getLookupServiceAddress();
+ dstBrokerUrl = pulsar1.getBrokerId();
dstBrokerServiceUrl = pulsar1.getBrokerServiceUrl();
}
checkOwnershipState(broker, bundle);
@@ -826,13 +826,13 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
TopicName topicName = topicAndBundle.getLeft();
NamespaceBundle bundle = topicAndBundle.getRight();
- String lookupServiceAddress1 = pulsar1.getLookupServiceAddress();
+ String brokerId1 = pulsar1.getBrokerId();
doReturn(List.of(new MockBrokerFilter() {
@Override
public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String, BrokerLookupData> brokers,
ServiceUnitId serviceUnit,
LoadManagerContext context) {
- brokers.remove(lookupServiceAddress1);
+ brokers.remove(brokerId1);
return CompletableFuture.completedFuture(brokers);
}
},new MockBrokerFilter() {
@@ -904,12 +904,12 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
// Test lookup heartbeat namespace's topic
for (PulsarService pulsar : pulsarServices) {
assertLookupHeartbeatOwner(pulsarService,
- pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+ pulsar.getBrokerId(), pulsar.getBrokerServiceUrl());
}
// Test lookup SLA namespace's topic
for (PulsarService pulsar : pulsarServices) {
assertLookupSLANamespaceOwner(pulsarService,
- pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+ pulsar.getBrokerId(), pulsar.getBrokerServiceUrl());
}
}
@@ -966,12 +966,12 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
// Test lookup heartbeat namespace's topic
for (PulsarService pulsar : pulsarServices) {
assertLookupHeartbeatOwner(pulsarService,
- pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+ pulsar.getBrokerId(), pulsar.getBrokerServiceUrl());
}
// Test lookup SLA namespace's topic
for (PulsarService pulsar : pulsarServices) {
assertLookupSLANamespaceOwner(pulsarService,
- pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+ pulsar.getBrokerId(), pulsar.getBrokerServiceUrl());
}
}
}
@@ -979,25 +979,25 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
}
private void assertLookupHeartbeatOwner(PulsarService pulsar,
- String lookupServiceAddress,
+ String brokerId,
String expectedBrokerServiceUrl) throws Exception {
NamespaceName heartbeatNamespaceV1 =
- getHeartbeatNamespace(lookupServiceAddress, pulsar.getConfiguration());
+ getHeartbeatNamespace(brokerId, pulsar.getConfiguration());
String heartbeatV1Topic = heartbeatNamespaceV1.getPersistentTopicName("test");
assertEquals(pulsar.getAdminClient().lookups().lookupTopic(heartbeatV1Topic), expectedBrokerServiceUrl);
NamespaceName heartbeatNamespaceV2 =
- getHeartbeatNamespaceV2(lookupServiceAddress, pulsar.getConfiguration());
+ getHeartbeatNamespaceV2(brokerId, pulsar.getConfiguration());
String heartbeatV2Topic = heartbeatNamespaceV2.getPersistentTopicName("test");
assertEquals(pulsar.getAdminClient().lookups().lookupTopic(heartbeatV2Topic), expectedBrokerServiceUrl);
}
private void assertLookupSLANamespaceOwner(PulsarService pulsar,
- String lookupServiceAddress,
+ String brokerId,
String expectedBrokerServiceUrl) throws Exception {
- NamespaceName slaMonitorNamespace = getSLAMonitorNamespace(lookupServiceAddress, pulsar.getConfiguration());
+ NamespaceName slaMonitorNamespace = getSLAMonitorNamespace(brokerId, pulsar.getConfiguration());
String slaMonitorTopic = slaMonitorNamespace.getPersistentTopicName("test");
String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
log.info("Topic {} Lookup result: {}", slaMonitorTopic, result);
@@ -1338,7 +1338,7 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
if (!pulsar3.getBrokerServiceUrl().equals(lookupResult1)) {
admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(),
- pulsar3.getLookupServiceAddress());
+ pulsar3.getBrokerId());
lookupResult1 = pulsar2.getAdminClient().lookups().lookupTopic(topic);
}
String lookupResult2 = pulsar1.getAdminClient().lookups().lookupTopic(topic);
@@ -1405,20 +1405,20 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
@Test(timeOut = 30 * 1000, priority = -1)
public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws Exception {
NamespaceName heartbeatNamespacePulsar1V1 =
- getHeartbeatNamespace(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration());
+ getHeartbeatNamespace(pulsar1.getBrokerId(), pulsar1.getConfiguration());
NamespaceName heartbeatNamespacePulsar1V2 =
- NamespaceService.getHeartbeatNamespaceV2(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration());
+ NamespaceService.getHeartbeatNamespaceV2(pulsar1.getBrokerId(), pulsar1.getConfiguration());
NamespaceName heartbeatNamespacePulsar2V1 =
- getHeartbeatNamespace(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration());
+ getHeartbeatNamespace(pulsar2.getBrokerId(), pulsar2.getConfiguration());
NamespaceName heartbeatNamespacePulsar2V2 =
- NamespaceService.getHeartbeatNamespaceV2(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration());
+ NamespaceService.getHeartbeatNamespaceV2(pulsar2.getBrokerId(), pulsar2.getConfiguration());
NamespaceName slaMonitorNamespacePulsar1 =
- getSLAMonitorNamespace(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration());
+ getSLAMonitorNamespace(pulsar1.getBrokerId(), pulsar1.getConfiguration());
NamespaceName slaMonitorNamespacePulsar2 =
- getSLAMonitorNamespace(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration());
+ getSLAMonitorNamespace(pulsar2.getBrokerId(), pulsar2.getConfiguration());
NamespaceBundle bundle1 = pulsar1.getNamespaceService().getNamespaceBundleFactory()
.getFullBundle(heartbeatNamespacePulsar1V1);
@@ -1448,9 +1448,9 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4));
assertTrue(ownedServiceUnitsByPulsar2.contains(slaBundle2));
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar1 =
- admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar1.getLookupServiceAddress());
+ admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar1.getBrokerId());
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar2 =
- admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar2.getLookupServiceAddress());
+ admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar2.getBrokerId());
assertTrue(ownedNamespacesByPulsar1.containsKey(bundle1.toString()));
assertTrue(ownedNamespacesByPulsar1.containsKey(bundle2.toString()));
assertTrue(ownedNamespacesByPulsar1.containsKey(slaBundle1.toString()));
@@ -1482,7 +1482,7 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
assertTrue(ownedBundles.contains(bundle));
});
Map<String, NamespaceOwnershipStatus> ownedNamespaces =
- admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar.getLookupServiceAddress());
+ admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar.getBrokerId());
assertTrue(ownedNamespaces.containsKey(bundle.toString()));
NamespaceOwnershipStatus status = ownedNamespaces.get(bundle.toString());
assertTrue(status.is_active);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 7bd12d66704..f7816151a42 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -109,8 +109,8 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
private PulsarService pulsar2;
private ServiceUnitStateChannel channel1;
private ServiceUnitStateChannel channel2;
- private String lookupServiceAddress1;
- private String lookupServiceAddress2;
+ private String brokerId1;
+ private String brokerId2;
private String bundle;
private String bundle1;
private String bundle2;
@@ -158,10 +158,10 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
channel2 = createChannel(pulsar2);
channel2.start();
- lookupServiceAddress1 = (String)
- FieldUtils.readDeclaredField(channel1, "lookupServiceAddress", true);
- lookupServiceAddress2 = (String)
- FieldUtils.readDeclaredField(channel2, "lookupServiceAddress", true);
+ brokerId1 = (String)
+ FieldUtils.readDeclaredField(channel1, "brokerId", true);
+ brokerId2 = (String)
+ FieldUtils.readDeclaredField(channel2, "brokerId", true);
bundle = "public/default/0x00000000_0xffffffff";
bundle1 = "public/default/0x00000000_0xfffffff0";
@@ -221,7 +221,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
assertEquals(newChannelOwner1, newChannelOwner2);
assertNotEquals(channelOwner1, newChannelOwner1);
- if (newChannelOwner1.equals(Optional.of(lookupServiceAddress1))) {
+ if (newChannelOwner1.equals(Optional.of(brokerId1))) {
assertTrue(channel1.isChannelOwnerAsync().get(2, TimeUnit.SECONDS));
assertFalse(channel2.isChannelOwnerAsync().get(2, TimeUnit.SECONDS));
} else {
@@ -306,7 +306,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
}
}
try {
- channel.publishAssignEventAsync(bundle, lookupServiceAddress1).get(2, TimeUnit.SECONDS);
+ channel.publishAssignEventAsync(bundle, brokerId1).get(2, TimeUnit.SECONDS);
} catch (ExecutionException e) {
if (e.getCause() instanceof IllegalStateException) {
errorCnt++;
@@ -314,7 +314,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
}
try {
channel.publishUnloadEventAsync(
- new Unload(lookupServiceAddress1, bundle, Optional.of(lookupServiceAddress2)))
+ new Unload(brokerId1, bundle, Optional.of(brokerId2)))
.get(2, TimeUnit.SECONDS);
} catch (ExecutionException e) {
if (e.getCause() instanceof IllegalStateException) {
@@ -322,7 +322,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
}
}
try {
- Split split = new Split(bundle, lookupServiceAddress1, Map.of(
+ Split split = new Split(bundle, brokerId1, Map.of(
childBundle1Range, Optional.empty(), childBundle2Range, Optional.empty()));
channel.publishSplitEventAsync(split)
.get(2, TimeUnit.SECONDS);
@@ -363,8 +363,8 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
assertTrue(owner1.get().isEmpty());
assertTrue(owner2.get().isEmpty());
- var assigned1 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
- var assigned2 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2);
+ var assigned1 = channel1.publishAssignEventAsync(bundle, brokerId1);
+ var assigned2 = channel2.publishAssignEventAsync(bundle, brokerId2);
assertNotNull(assigned1);
assertNotNull(assigned2);
waitUntilOwnerChanges(channel1, bundle, null);
@@ -373,8 +373,8 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
String assignedAddr2 = assigned2.get(5, TimeUnit.SECONDS);
assertEquals(assignedAddr1, assignedAddr2);
- assertTrue(assignedAddr1.equals(lookupServiceAddress1)
- || assignedAddr1.equals(lookupServiceAddress2), assignedAddr1);
+ assertTrue(assignedAddr1.equals(brokerId1)
+ || assignedAddr1.equals(brokerId2), assignedAddr1);
var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
@@ -415,13 +415,13 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
assertTrue(owner1.get().isEmpty());
assertTrue(owner2.get().isEmpty());
- var owner3 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
- var owner4 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2);
+ var owner3 = channel1.publishAssignEventAsync(bundle, brokerId1);
+ var owner4 = channel2.publishAssignEventAsync(bundle, brokerId2);
assertTrue(owner3.isCompletedExceptionally());
assertNotNull(owner4);
String ownerAddrOpt2 = owner4.get(5, TimeUnit.SECONDS);
- assertEquals(ownerAddrOpt2, lookupServiceAddress2);
- waitUntilNewOwner(channel1, bundle, lookupServiceAddress2);
+ assertEquals(ownerAddrOpt2, brokerId2);
+ waitUntilNewOwner(channel1, bundle, brokerId2);
assertEquals(0, getOwnerRequests1.size());
assertEquals(0, getOwnerRequests2.size());
@@ -439,25 +439,25 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
assertTrue(owner2.get().isEmpty());
- channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
- waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
- waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
+ channel1.publishAssignEventAsync(bundle, brokerId1);
+ waitUntilNewOwner(channel1, bundle, brokerId1);
+ waitUntilNewOwner(channel2, bundle, brokerId1);
var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
assertEquals(ownerAddr1, ownerAddr2);
- assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
+ assertEquals(ownerAddr1, Optional.of(brokerId1));
- Unload unload = new Unload(lookupServiceAddress1, bundle, Optional.of(lookupServiceAddress2));
+ Unload unload = new Unload(brokerId1, bundle, Optional.of(brokerId2));
channel1.publishUnloadEventAsync(unload);
- waitUntilNewOwner(channel1, bundle, lookupServiceAddress2);
- waitUntilNewOwner(channel2, bundle, lookupServiceAddress2);
+ waitUntilNewOwner(channel1, bundle, brokerId2);
+ waitUntilNewOwner(channel2, bundle, brokerId2);
ownerAddr1 = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS);
ownerAddr2 = channel2.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS);
assertEquals(ownerAddr1, ownerAddr2);
- assertEquals(ownerAddr1, Optional.of(lookupServiceAddress2));
+ assertEquals(ownerAddr1, Optional.of(brokerId2));
validateHandlerCounters(channel1, 2, 0, 2, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
validateHandlerCounters(channel2, 2, 0, 2, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
@@ -474,14 +474,14 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
assertEquals(0, getOwnerRequests1.size());
assertEquals(0, getOwnerRequests2.size());
- channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
- waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
- waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
+ channel1.publishAssignEventAsync(bundle, brokerId1);
+ waitUntilNewOwner(channel1, bundle, brokerId1);
+ waitUntilNewOwner(channel2, bundle, brokerId1);
var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
assertEquals(ownerAddr1, ownerAddr2);
- assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
+ assertEquals(ownerAddr1, Optional.of(brokerId1));
var producer = (Producer<ServiceUnitStateData>) FieldUtils.readDeclaredField(channel1,
"producer", true);
@@ -497,7 +497,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
"inFlightStateWaitingTimeInMillis", 3 * 1000, true);
FieldUtils.writeDeclaredField(channel2,
"inFlightStateWaitingTimeInMillis", 3 * 1000, true);
- Unload unload = new Unload(lookupServiceAddress1, bundle, Optional.of(lookupServiceAddress2));
+ Unload unload = new Unload(brokerId1, bundle, Optional.of(brokerId2));
channel1.publishUnloadEventAsync(unload);
// channel1 is broken. the ownership transfer won't be complete.
waitUntilState(channel1, bundle);
@@ -521,7 +521,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
assertEquals(0, getOwnerRequests2.size());
// recovered, check the monitor update state : Assigned -> Owned
- doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1)))
+ doReturn(CompletableFuture.completedFuture(Optional.of(brokerId1)))
.when(loadManager).selectAsync(any(), any());
FieldUtils.writeDeclaredField(channel2, "producer", producer, true);
FieldUtils.writeDeclaredField(channel1,
@@ -530,18 +530,18 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
"inFlightStateWaitingTimeInMillis", 1 , true);
((ServiceUnitStateChannelImpl) channel1).monitorOwnerships(
- List.of(lookupServiceAddress1, lookupServiceAddress2));
+ List.of(brokerId1, brokerId2));
((ServiceUnitStateChannelImpl) channel2).monitorOwnerships(
- List.of(lookupServiceAddress1, lookupServiceAddress2));
+ List.of(brokerId1, brokerId2));
- waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
- waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
+ waitUntilNewOwner(channel1, bundle, brokerId1);
+ waitUntilNewOwner(channel2, bundle, brokerId1);
ownerAddr1 = channel1.getOwnerAsync(bundle).get();
ownerAddr2 = channel2.getOwnerAsync(bundle).get();
assertEquals(ownerAddr1, ownerAddr2);
- assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
+ assertEquals(ownerAddr1, Optional.of(brokerId1));
var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
validateMonitorCounters(leader,
@@ -562,13 +562,13 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
@Test(priority = 6)
public void splitAndRetryTest() throws Exception {
- channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
- waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
- waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
+ channel1.publishAssignEventAsync(bundle, brokerId1);
+ waitUntilNewOwner(channel1, bundle, brokerId1);
+ waitUntilNewOwner(channel2, bundle, brokerId1);
var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
- assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
- assertEquals(ownerAddr2, Optional.of(lookupServiceAddress1));
+ assertEquals(ownerAddr1, Optional.of(brokerId1));
+ assertEquals(ownerAddr2, Optional.of(brokerId1));
assertTrue(ownerAddr1.isPresent());
NamespaceService namespaceService = pulsar1.getNamespaceService();
@@ -609,14 +609,14 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
- waitUntilNewOwner(channel1, childBundle11, lookupServiceAddress1);
- waitUntilNewOwner(channel1, childBundle12, lookupServiceAddress1);
- waitUntilNewOwner(channel2, childBundle11, lookupServiceAddress1);
- waitUntilNewOwner(channel2, childBundle12, lookupServiceAddress1);
- assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(childBundle11).get());
- assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(childBundle12).get());
- assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(childBundle11).get());
- assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(childBundle12).get());
+ waitUntilNewOwner(channel1, childBundle11, brokerId1);
+ waitUntilNewOwner(channel1, childBundle12, brokerId1);
+ waitUntilNewOwner(channel2, childBundle11, brokerId1);
+ waitUntilNewOwner(channel2, childBundle12, brokerId1);
+ assertEquals(Optional.of(brokerId1), channel1.getOwnerAsync(childBundle11).get());
+ assertEquals(Optional.of(brokerId1), channel1.getOwnerAsync(childBundle12).get());
+ assertEquals(Optional.of(brokerId1), channel2.getOwnerAsync(childBundle11).get());
+ assertEquals(Optional.of(brokerId1), channel2.getOwnerAsync(childBundle12).get());
// try the monitor and check the monitor moves `Deleted` -> `Init`
@@ -631,9 +631,9 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
"stateTombstoneDelayTimeInMillis", 1, true);
((ServiceUnitStateChannelImpl) channel1).monitorOwnerships(
- List.of(lookupServiceAddress1, lookupServiceAddress2));
+ List.of(brokerId1, brokerId2));
((ServiceUnitStateChannelImpl) channel2).monitorOwnerships(
- List.of(lookupServiceAddress1, lookupServiceAddress2));
+ List.of(brokerId1, brokerId2));
waitUntilState(channel1, bundle, Init);
waitUntilState(channel2, bundle, Init);
@@ -727,7 +727,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
String leader = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
assertEquals(leader, leader2);
- if (leader.equals(lookupServiceAddress2)) {
+ if (leader.equals(brokerId2)) {
leaderChannel = channel2;
followerChannel = channel1;
var tmp = followerCleanupJobsTmp;
@@ -743,12 +743,12 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
var owner1 = channel1.getOwnerAsync(bundle1);
var owner2 = channel2.getOwnerAsync(bundle2);
- doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
+ doReturn(CompletableFuture.completedFuture(Optional.of(brokerId2)))
.when(loadManager).selectAsync(any(), any());
assertTrue(owner1.get().isEmpty());
assertTrue(owner2.get().isEmpty());
- String broker = lookupServiceAddress1;
+ String broker = brokerId1;
channel1.publishAssignEventAsync(bundle1, broker);
channel2.publishAssignEventAsync(bundle2, broker);
@@ -758,9 +758,9 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
waitUntilNewOwner(channel2, bundle2, broker);
// Verify to transfer the ownership to the other broker.
- channel1.publishUnloadEventAsync(new Unload(broker, bundle1, Optional.of(lookupServiceAddress2)));
- waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2);
- waitUntilNewOwner(channel2, bundle1, lookupServiceAddress2);
+ channel1.publishUnloadEventAsync(new Unload(broker, bundle1, Optional.of(brokerId2)));
+ waitUntilNewOwner(channel1, bundle1, brokerId2);
+ waitUntilNewOwner(channel2, bundle1, brokerId2);
// test stable metadata state
leaderChannel.handleMetadataSessionEvent(SessionReestablished);
@@ -771,13 +771,13 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true);
leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
- leaderChannel.handleBrokerRegistrationEvent(lookupServiceAddress2, NotificationType.Deleted);
- followerChannel.handleBrokerRegistrationEvent(lookupServiceAddress2, NotificationType.Deleted);
+ leaderChannel.handleBrokerRegistrationEvent(brokerId2, NotificationType.Deleted);
+ followerChannel.handleBrokerRegistrationEvent(brokerId2, NotificationType.Deleted);
- waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2);
- waitUntilNewOwner(channel2, bundle1, lookupServiceAddress2);
- waitUntilNewOwner(channel1, bundle2, lookupServiceAddress2);
- waitUntilNewOwner(channel2, bundle2, lookupServiceAddress2);
+ waitUntilNewOwner(channel1, bundle1, brokerId2);
+ waitUntilNewOwner(channel2, bundle1, brokerId2);
+ waitUntilNewOwner(channel1, bundle2, brokerId2);
+ waitUntilNewOwner(channel2, bundle2, brokerId2);
verify(leaderCleanupJobs, times(1)).computeIfAbsent(eq(broker), any());
verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any());
@@ -798,8 +798,8 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
// test jittery metadata state
- channel1.publishUnloadEventAsync(new Unload(lookupServiceAddress2, bundle1, Optional.of(broker)));
- channel1.publishUnloadEventAsync(new Unload(lookupServiceAddress2, bundle2, Optional.of(broker)));
+ channel1.publishUnloadEventAsync(new Unload(brokerId2, bundle1, Optional.of(broker)));
+ channel1.publishUnloadEventAsync(new Unload(brokerId2, bundle2, Optional.of(broker)));
waitUntilNewOwner(channel1, bundle1, broker);
waitUntilNewOwner(channel2, bundle1, broker);
waitUntilNewOwner(channel1, bundle2, broker);
@@ -871,10 +871,10 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
1);
// finally cleanup
- waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2);
- waitUntilNewOwner(channel2, bundle1, lookupServiceAddress2);
- waitUntilNewOwner(channel1, bundle2, lookupServiceAddress2);
- waitUntilNewOwner(channel2, bundle2, lookupServiceAddress2);
+ waitUntilNewOwner(channel1, bundle1, brokerId2);
+ waitUntilNewOwner(channel2, bundle1, brokerId2);
+ waitUntilNewOwner(channel1, bundle2, brokerId2);
+ waitUntilNewOwner(channel2, bundle2, brokerId2);
verify(leaderCleanupJobs, times(3)).computeIfAbsent(eq(broker), any());
verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any());
@@ -893,8 +893,8 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
1);
// test unstable state
- channel1.publishUnloadEventAsync(new Unload(lookupServiceAddress2, bundle1, Optional.of(broker)));
- channel1.publishUnloadEventAsync(new Unload(lookupServiceAddress2, bundle2, Optional.of(broker)));
+ channel1.publishUnloadEventAsync(new Unload(brokerId2, bundle1, Optional.of(broker)));
+ channel1.publishUnloadEventAsync(new Unload(brokerId2, bundle2, Optional.of(broker)));
waitUntilNewOwner(channel1, bundle1, broker);
waitUntilNewOwner(channel2, bundle1, broker);
waitUntilNewOwner(channel1, bundle2, broker);
@@ -938,17 +938,17 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
assertTrue(owner1.get().isEmpty());
assertTrue(owner2.get().isEmpty());
- var assigned1 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
+ var assigned1 = channel1.publishAssignEventAsync(bundle, brokerId1);
assertNotNull(assigned1);
- waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
- waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
+ waitUntilNewOwner(channel1, bundle, brokerId1);
+ waitUntilNewOwner(channel2, bundle, brokerId1);
String assignedAddr1 = assigned1.get(5, TimeUnit.SECONDS);
- assertEquals(lookupServiceAddress1, assignedAddr1);
+ assertEquals(brokerId1, assignedAddr1);
FieldUtils.writeDeclaredField(channel2,
"inFlightStateWaitingTimeInMillis", 3 * 1000, true);
- var assigned2 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2);
+ var assigned2 = channel2.publishAssignEventAsync(bundle, brokerId2);
assertNotNull(assigned2);
Exception ex = null;
try {
@@ -957,8 +957,8 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
ex = e;
}
assertNull(ex);
- assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(bundle).get());
- assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(bundle).get());
+ assertEquals(Optional.of(brokerId1), channel2.getOwnerAsync(bundle).get());
+ assertEquals(Optional.of(brokerId1), channel1.getOwnerAsync(bundle).get());
var compactor = spy (pulsar1.getStrategicCompactor());
Field strategicCompactorField = FieldUtils.getDeclaredField(PulsarService.class, "strategicCompactor", true);
@@ -968,7 +968,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
.pollInterval(200, TimeUnit.MILLISECONDS)
.atMost(140, TimeUnit.SECONDS)
.untilAsserted(() -> {
- channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
+ channel1.publishAssignEventAsync(bundle, brokerId1);
verify(compactor, times(1))
.compact(eq(ServiceUnitStateChannelImpl.TOPIC), any());
});
@@ -980,7 +980,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
.pollInterval(200, TimeUnit.MILLISECONDS)
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(
- channel3.getOwnerAsync(bundle).get(), Optional.of(lookupServiceAddress1)));
+ channel3.getOwnerAsync(bundle).get(), Optional.of(brokerId1)));
channel3.close();
FieldUtils.writeDeclaredField(channel2,
"inFlightStateWaitingTimeInMillis", 30 * 1000, true);
@@ -1025,16 +1025,16 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
public void unloadTest()
throws ExecutionException, InterruptedException, IllegalAccessException {
- channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
+ channel1.publishAssignEventAsync(bundle, brokerId1);
- waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
- waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
+ waitUntilNewOwner(channel1, bundle, brokerId1);
+ waitUntilNewOwner(channel2, bundle, brokerId1);
var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
assertEquals(ownerAddr1, ownerAddr2);
- assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
- Unload unload = new Unload(lookupServiceAddress1, bundle, Optional.empty());
+ assertEquals(ownerAddr1, Optional.of(brokerId1));
+ Unload unload = new Unload(brokerId1, bundle, Optional.empty());
channel1.publishUnloadEventAsync(unload);
@@ -1046,17 +1046,17 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
assertEquals(Optional.empty(), owner1.get());
assertEquals(Optional.empty(), owner2.get());
- channel2.publishAssignEventAsync(bundle, lookupServiceAddress2);
+ channel2.publishAssignEventAsync(bundle, brokerId2);
- waitUntilNewOwner(channel1, bundle, lookupServiceAddress2);
- waitUntilNewOwner(channel2, bundle, lookupServiceAddress2);
+ waitUntilNewOwner(channel1, bundle, brokerId2);
+ waitUntilNewOwner(channel2, bundle, brokerId2);
ownerAddr1 = channel1.getOwnerAsync(bundle).get();
ownerAddr2 = channel2.getOwnerAsync(bundle).get();
assertEquals(ownerAddr1, ownerAddr2);
- assertEquals(ownerAddr1, Optional.of(lookupServiceAddress2));
- Unload unload2 = new Unload(lookupServiceAddress2, bundle, Optional.empty());
+ assertEquals(ownerAddr1, Optional.of(brokerId2));
+ Unload unload2 = new Unload(brokerId2, bundle, Optional.empty());
channel2.publishUnloadEventAsync(unload2);
@@ -1075,9 +1075,9 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
"stateTombstoneDelayTimeInMillis", 1, true);
((ServiceUnitStateChannelImpl) channel1).monitorOwnerships(
- List.of(lookupServiceAddress1, lookupServiceAddress2));
+ List.of(brokerId1, brokerId2));
((ServiceUnitStateChannelImpl) channel2).monitorOwnerships(
- List.of(lookupServiceAddress1, lookupServiceAddress2));
+ List.of(brokerId1, brokerId2));
waitUntilState(channel1, bundle, Init);
waitUntilState(channel2, bundle, Init);
@@ -1107,7 +1107,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
public void assignTestWhenDestBrokerProducerFails()
throws ExecutionException, InterruptedException, IllegalAccessException {
- Unload unload = new Unload(lookupServiceAddress1, bundle, Optional.empty());
+ Unload unload = new Unload(brokerId1, bundle, Optional.empty());
channel1.publishUnloadEventAsync(unload);
@@ -1131,9 +1131,9 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
"inFlightStateWaitingTimeInMillis", 3 * 1000, true);
FieldUtils.writeDeclaredField(channel2,
"inFlightStateWaitingTimeInMillis", 3 * 1000, true);
- doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
+ doReturn(CompletableFuture.completedFuture(Optional.of(brokerId2)))
.when(loadManager).selectAsync(any(), any());
- channel1.publishAssignEventAsync(bundle, lookupServiceAddress2);
+ channel1.publishAssignEventAsync(bundle, brokerId2);
// channel1 is broken. the assign won't be complete.
waitUntilState(channel1, bundle);
waitUntilState(channel2, bundle);
@@ -1157,18 +1157,18 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
"inFlightStateWaitingTimeInMillis", 1 , true);
((ServiceUnitStateChannelImpl) channel1).monitorOwnerships(
- List.of(lookupServiceAddress1, lookupServiceAddress2));
+ List.of(brokerId1, brokerId2));
((ServiceUnitStateChannelImpl) channel2).monitorOwnerships(
- List.of(lookupServiceAddress1, lookupServiceAddress2));
+ List.of(brokerId1, brokerId2));
- waitUntilNewOwner(channel1, bundle, lookupServiceAddress2);
- waitUntilNewOwner(channel2, bundle, lookupServiceAddress2);
+ waitUntilNewOwner(channel1, bundle, brokerId2);
+ waitUntilNewOwner(channel2, bundle, brokerId2);
var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
assertEquals(ownerAddr1, ownerAddr2);
- assertEquals(ownerAddr1, Optional.of(lookupServiceAddress2));
+ assertEquals(ownerAddr1, Optional.of(brokerId2));
var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
validateMonitorCounters(leader,
@@ -1192,20 +1192,20 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
throws ExecutionException, InterruptedException, IllegalAccessException {
- Unload unload = new Unload(lookupServiceAddress1, bundle, Optional.empty());
+ Unload unload = new Unload(brokerId1, bundle, Optional.empty());
channel1.publishUnloadEventAsync(unload);
waitUntilState(channel1, bundle, Free);
waitUntilState(channel2, bundle, Free);
- channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
+ channel1.publishAssignEventAsync(bundle, brokerId1);
waitUntilState(channel1, bundle, Owned);
waitUntilState(channel2, bundle, Owned);
- assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(bundle).get().get());
- assertEquals(lookupServiceAddress1, channel2.getOwnerAsync(bundle).get().get());
+ assertEquals(brokerId1, channel1.getOwnerAsync(bundle).get().get());
+ assertEquals(brokerId1, channel2.getOwnerAsync(bundle).get().get());
var producer = (Producer<ServiceUnitStateData>) FieldUtils.readDeclaredField(channel1,
"producer", true);
@@ -1224,7 +1224,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
// Assert child bundle ownerships in the channels.
- Split split = new Split(bundle, lookupServiceAddress1, Map.of(
+ Split split = new Split(bundle, brokerId1, Map.of(
childBundle1Range, Optional.empty(), childBundle2Range, Optional.empty()));
channel2.publishSplitEventAsync(split);
// channel1 is broken. the split won't be complete.
@@ -1271,13 +1271,13 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
assertFalse(owner1);
assertFalse(owner2);
- owner1 = channel1.isOwner(bundle, lookupServiceAddress2);
- owner2 = channel2.isOwner(bundle, lookupServiceAddress1);
+ owner1 = channel1.isOwner(bundle, brokerId2);
+ owner2 = channel2.isOwner(bundle, brokerId1);
assertFalse(owner1);
assertFalse(owner2);
- channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
+ channel1.publishAssignEventAsync(bundle, brokerId1);
owner2 = channel2.isOwner(bundle);
assertFalse(owner2);
@@ -1290,34 +1290,34 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
assertTrue(owner1);
assertFalse(owner2);
- owner1 = channel1.isOwner(bundle, lookupServiceAddress1);
- owner2 = channel2.isOwner(bundle, lookupServiceAddress2);
+ owner1 = channel1.isOwner(bundle, brokerId1);
+ owner2 = channel2.isOwner(bundle, brokerId2);
assertTrue(owner1);
assertFalse(owner2);
- owner1 = channel2.isOwner(bundle, lookupServiceAddress1);
- owner2 = channel1.isOwner(bundle, lookupServiceAddress2);
+ owner1 = channel2.isOwner(bundle, brokerId1);
+ owner2 = channel1.isOwner(bundle, brokerId2);
assertTrue(owner1);
assertFalse(owner2);
- overrideTableView(channel1, bundle, new ServiceUnitStateData(Assigning, lookupServiceAddress1, 1));
+ overrideTableView(channel1, bundle, new ServiceUnitStateData(Assigning, brokerId1, 1));
assertFalse(channel1.isOwner(bundle));
- overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned, lookupServiceAddress1, 1));
+ overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned, brokerId1, 1));
assertTrue(channel1.isOwner(bundle));
- overrideTableView(channel1, bundle, new ServiceUnitStateData(Releasing, null, lookupServiceAddress1, 1));
+ overrideTableView(channel1, bundle, new ServiceUnitStateData(Releasing, null, brokerId1, 1));
assertFalse(channel1.isOwner(bundle));
- overrideTableView(channel1, bundle, new ServiceUnitStateData(Splitting, null, lookupServiceAddress1, 1));
+ overrideTableView(channel1, bundle, new ServiceUnitStateData(Splitting, null, brokerId1, 1));
assertTrue(channel1.isOwner(bundle));
- overrideTableView(channel1, bundle, new ServiceUnitStateData(Free, null, lookupServiceAddress1, 1));
+ overrideTableView(channel1, bundle, new ServiceUnitStateData(Free, null, brokerId1, 1));
assertFalse(channel1.isOwner(bundle));
- overrideTableView(channel1, bundle, new ServiceUnitStateData(Deleted, null, lookupServiceAddress1, 1));
+ overrideTableView(channel1, bundle, new ServiceUnitStateData(Deleted, null, brokerId1, 1));
assertFalse(channel1.isOwner(bundle));
overrideTableView(channel1, bundle, null);
@@ -1326,13 +1326,13 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
@Test(priority = 16)
public void splitAndRetryFailureTest() throws Exception {
- channel1.publishAssignEventAsync(bundle3, lookupServiceAddress1);
- waitUntilNewOwner(channel1, bundle3, lookupServiceAddress1);
- waitUntilNewOwner(channel2, bundle3, lookupServiceAddress1);
+ channel1.publishAssignEventAsync(bundle3, brokerId1);
+ waitUntilNewOwner(channel1, bundle3, brokerId1);
+ waitUntilNewOwner(channel2, bundle3, brokerId1);
var ownerAddr1 = channel1.getOwnerAsync(bundle3).get();
var ownerAddr2 = channel2.getOwnerAsync(bundle3).get();
- assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
- assertEquals(ownerAddr2, Optional.of(lookupServiceAddress1));
+ assertEquals(ownerAddr1, Optional.of(brokerId1));
+ assertEquals(ownerAddr2, Optional.of(brokerId1));
assertTrue(ownerAddr1.isPresent());
NamespaceService namespaceService = pulsar1.getNamespaceService();
@@ -1373,7 +1373,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
});
var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
((ServiceUnitStateChannelImpl) leader)
- .monitorOwnerships(List.of(lookupServiceAddress1, lookupServiceAddress2));
+ .monitorOwnerships(List.of(brokerId1, brokerId2));
waitUntilState(leader, bundle3, Deleted);
waitUntilState(channel1, bundle3, Deleted);
waitUntilState(channel2, bundle3, Deleted);
@@ -1384,14 +1384,14 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
- waitUntilNewOwner(channel1, childBundle31, lookupServiceAddress1);
- waitUntilNewOwner(channel1, childBundle32, lookupServiceAddress1);
- waitUntilNewOwner(channel2, childBundle31, lookupServiceAddress1);
- waitUntilNewOwner(channel2, childBundle32, lookupServiceAddress1);
- assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(childBundle31).get());
- assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(childBundle32).get());
- assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(childBundle31).get());
- assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(childBundle32).get());
+ waitUntilNewOwner(channel1, childBundle31, brokerId1);
+ waitUntilNewOwner(channel1, childBundle32, brokerId1);
+ waitUntilNewOwner(channel2, childBundle31, brokerId1);
+ waitUntilNewOwner(channel2, childBundle32, brokerId1);
+ assertEquals(Optional.of(brokerId1), channel1.getOwnerAsync(childBundle31).get());
+ assertEquals(Optional.of(brokerId1), channel1.getOwnerAsync(childBundle32).get());
+ assertEquals(Optional.of(brokerId1), channel2.getOwnerAsync(childBundle31).get());
+ assertEquals(Optional.of(brokerId1), channel2.getOwnerAsync(childBundle32).get());
// try the monitor and check the monitor moves `Deleted` -> `Init`
@@ -1402,9 +1402,9 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
"stateTombstoneDelayTimeInMillis", 1, true);
((ServiceUnitStateChannelImpl) channel1).monitorOwnerships(
- List.of(lookupServiceAddress1, lookupServiceAddress2));
+ List.of(brokerId1, brokerId2));
((ServiceUnitStateChannelImpl) channel2).monitorOwnerships(
- List.of(lookupServiceAddress1, lookupServiceAddress2));
+ List.of(brokerId1, brokerId2));
waitUntilState(channel1, bundle3, Init);
waitUntilState(channel2, bundle3, Init);
@@ -1440,12 +1440,12 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
String leader = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
assertEquals(leader, leader2);
- if (leader.equals(lookupServiceAddress2)) {
+ if (leader.equals(brokerId2)) {
leaderChannel = channel2;
followerChannel = channel1;
}
- String broker = lookupServiceAddress1;
+ String broker = brokerId1;
// test override states
String releasingBundle = "public/releasing/0xfffffff0_0xffffffff";
@@ -1470,7 +1470,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
new ServiceUnitStateData(Owned, broker, null, 1));
// test stable metadata state
- doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
+ doReturn(CompletableFuture.completedFuture(Optional.of(brokerId2)))
.when(loadManager).selectAsync(any(), any());
leaderChannel.handleMetadataSessionEvent(SessionReestablished);
followerChannel.handleMetadataSessionEvent(SessionReestablished);
@@ -1481,11 +1481,11 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
- waitUntilNewOwner(channel2, releasingBundle, lookupServiceAddress2);
- waitUntilNewOwner(channel2, childBundle11, lookupServiceAddress2);
- waitUntilNewOwner(channel2, childBundle12, lookupServiceAddress2);
- waitUntilNewOwner(channel2, assigningBundle, lookupServiceAddress2);
- waitUntilNewOwner(channel2, ownedBundle, lookupServiceAddress2);
+ waitUntilNewOwner(channel2, releasingBundle, brokerId2);
+ waitUntilNewOwner(channel2, childBundle11, brokerId2);
+ waitUntilNewOwner(channel2, childBundle12, brokerId2);
+ waitUntilNewOwner(channel2, assigningBundle, brokerId2);
+ waitUntilNewOwner(channel2, ownedBundle, brokerId2);
assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get());
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());
@@ -1505,12 +1505,12 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
String leader = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
assertEquals(leader, leader2);
- if (leader.equals(lookupServiceAddress2)) {
+ if (leader.equals(brokerId2)) {
leaderChannel = channel2;
followerChannel = channel1;
}
- String broker = lookupServiceAddress1;
+ String broker = brokerId1;
// test override states
String releasingBundle = "public/releasing/0xfffffff0_0xffffffff";
@@ -1535,19 +1535,19 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
new ServiceUnitStateData(Owned, broker, null, 1));
// test stable metadata state
- doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
+ doReturn(CompletableFuture.completedFuture(Optional.of(brokerId2)))
.when(loadManager).selectAsync(any(), any());
FieldUtils.writeDeclaredField(leaderChannel, "inFlightStateWaitingTimeInMillis",
-1, true);
FieldUtils.writeDeclaredField(followerChannel, "inFlightStateWaitingTimeInMillis",
-1, true);
((ServiceUnitStateChannelImpl) leaderChannel)
- .monitorOwnerships(List.of(lookupServiceAddress1, lookupServiceAddress2));
+ .monitorOwnerships(List.of(brokerId1, brokerId2));
waitUntilNewOwner(channel2, releasingBundle, broker);
waitUntilNewOwner(channel2, childBundle11, broker);
waitUntilNewOwner(channel2, childBundle12, broker);
- waitUntilNewOwner(channel2, assigningBundle, lookupServiceAddress2);
+ waitUntilNewOwner(channel2, assigningBundle, brokerId2);
waitUntilNewOwner(channel2, ownedBundle, broker);
assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get());
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
@@ -1566,7 +1566,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
// set the bundle owner is the broker
- String broker = lookupServiceAddress2;
+ String broker = brokerId2;
String bundle = "public/owned/0xfffffff0_0xffffffff";
overrideTableViews(bundle,
new ServiceUnitStateData(Owned, broker, null, 1));
@@ -1596,7 +1596,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
String leader1 = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
assertEquals(leader1, leader2);
- if (leader1.equals(lookupServiceAddress2)) {
+ if (leader1.equals(brokerId2)) {
leaderChannel = channel2;
}
leaderChannel.handleMetadataSessionEvent(SessionReestablished);
@@ -1611,10 +1611,10 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty());
assertTrue(System.currentTimeMillis() - start < 20_000);
- // simulate ownership cleanup(lookupServiceAddress1 selected owner) by the leader channel
+ // simulate ownership cleanup(brokerId1 selected owner) by the leader channel
overrideTableViews(bundle,
new ServiceUnitStateData(Owned, broker, null, 1));
- doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1)))
+ doReturn(CompletableFuture.completedFuture(Optional.of(brokerId1)))
.when(loadManager).selectAsync(any(), any());
leaderChannel.handleMetadataSessionEvent(SessionReestablished);
FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp",
@@ -1622,9 +1622,9 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
getCleanupJobs(leaderChannel).clear();
leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
- // verify the ownership cleanup, and channel's getOwnerAsync returns lookupServiceAddress1 without timeout
+ // verify the ownership cleanup, and channel's getOwnerAsync returns brokerId1 without timeout
start = System.currentTimeMillis();
- assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(bundle).get().get());
+ assertEquals(brokerId1, channel1.getOwnerAsync(bundle).get().get());
assertTrue(System.currentTimeMillis() - start < 20_000);
// test clean-up
@@ -1746,7 +1746,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
.atMost(10, TimeUnit.SECONDS)
.until(() -> { // wait until true
((ServiceUnitStateChannelImpl) channel)
- .monitorOwnerships(List.of(lookupServiceAddress1, lookupServiceAddress2));
+ .monitorOwnerships(List.of(brokerId1, brokerId2));
ServiceUnitStateData data = tv.get(key);
ServiceUnitState actual = state(data);
return actual == expected;
@@ -1968,7 +1968,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
var leaderElectionService = new LeaderElectionService(
- pulsar.getCoordinationService(), pulsar.getSafeWebServiceAddress(),
+ pulsar.getCoordinationService(), pulsar.getBrokerId(), pulsar.getSafeWebServiceAddress(),
state -> {
if (state == LeaderElectionState.Leading) {
channel.scheduleOwnershipMonitor();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
index f45e1405e1d..87aaf4bac7f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
@@ -84,20 +84,20 @@ public class BrokerIsolationPoliciesFilterTest {
// a. available-brokers: broker1, broker2, broker3 => result: broker1
Map<String, BrokerLookupData> result = filter.filterAsync(new HashMap<>(Map.of(
- "broker1", getLookupData(),
- "broker2", getLookupData(),
- "broker3", getLookupData())), namespaceName, getContext()).get();
- assertEquals(result.keySet(), Set.of("broker1"));
+ "broker1:8080", getLookupData(),
+ "broker2:8080", getLookupData(),
+ "broker3:8080", getLookupData())), namespaceName, getContext()).get();
+ assertEquals(result.keySet(), Set.of("broker1:8080"));
// b. available-brokers: broker2, broker3 => result: broker2
result = filter.filterAsync(new HashMap<>(Map.of(
- "broker2", getLookupData(),
- "broker3", getLookupData())), namespaceName, getContext()).get();
- assertEquals(result.keySet(), Set.of("broker2"));
+ "broker2:8080", getLookupData(),
+ "broker3:8080", getLookupData())), namespaceName, getContext()).get();
+ assertEquals(result.keySet(), Set.of("broker2:8080"));
// c. available-brokers: broker3 => result: NULL
result = filter.filterAsync(new HashMap<>(Map.of(
- "broker3", getLookupData())), namespaceName, getContext()).get();
+ "broker3:8080", getLookupData())), namespaceName, getContext()).get();
assertTrue(result.isEmpty());
// 2. Namespace: primary=broker1, secondary=broker2, shared=broker3, min_limit = 2
@@ -105,20 +105,20 @@ public class BrokerIsolationPoliciesFilterTest {
// a. available-brokers: broker1, broker2, broker3 => result: broker1, broker2
result = filter.filterAsync(new HashMap<>(Map.of(
- "broker1", getLookupData(),
- "broker2", getLookupData(),
- "broker3", getLookupData())), namespaceName, getContext()).get();
- assertEquals(result.keySet(), Set.of("broker1", "broker2"));
+ "broker1:8080", getLookupData(),
+ "broker2:8080", getLookupData(),
+ "broker3:8080", getLookupData())), namespaceName, getContext()).get();
+ assertEquals(result.keySet(), Set.of("broker1:8080", "broker2:8080"));
// b. available-brokers: broker2, broker3 => result: broker2
result = filter.filterAsync(new HashMap<>(Map.of(
- "broker2", getLookupData(),
- "broker3", getLookupData())), namespaceName, getContext()).get();
- assertEquals(result.keySet(), Set.of("broker2"));
+ "broker2:8080", getLookupData(),
+ "broker3:8080", getLookupData())), namespaceName, getContext()).get();
+ assertEquals(result.keySet(), Set.of("broker2:8080"));
// c. available-brokers: broker3 => result: NULL
result = filter.filterAsync(new HashMap<>(Map.of(
- "broker3", getLookupData())), namespaceName, getContext()).get();
+ "broker3:8080", getLookupData())), namespaceName, getContext()).get();
assertTrue(result.isEmpty());
}
@@ -142,31 +142,31 @@ public class BrokerIsolationPoliciesFilterTest {
Map<String, BrokerLookupData> result = filter.filterAsync(new HashMap<>(Map.of(
- "broker1", getLookupData(),
- "broker2", getLookupData(),
- "broker3", getLookupData())), namespaceBundle, getContext()).get();
- assertEquals(result.keySet(), Set.of("broker1", "broker2", "broker3"));
+ "broker1:8080", getLookupData(),
+ "broker2:8080", getLookupData(),
+ "broker3:8080", getLookupData())), namespaceBundle, getContext()).get();
+ assertEquals(result.keySet(), Set.of("broker1:8080", "broker2:8080", "broker3:8080"));
result = filter.filterAsync(new HashMap<>(Map.of(
- "broker1", getLookupData(true, false),
- "broker2", getLookupData(true, false),
- "broker3", getLookupData())), namespaceBundle, getContext()).get();
- assertEquals(result.keySet(), Set.of("broker3"));
+ "broker1:8080", getLookupData(true, false),
+ "broker2:8080", getLookupData(true, false),
+ "broker3:8080", getLookupData())), namespaceBundle, getContext()).get();
+ assertEquals(result.keySet(), Set.of("broker3:8080"));
doReturn(false).when(namespaceBundle).hasNonPersistentTopic();
result = filter.filterAsync(new HashMap<>(Map.of(
- "broker1", getLookupData(),
- "broker2", getLookupData(),
- "broker3", getLookupData())), namespaceBundle, getContext()).get();
- assertEquals(result.keySet(), Set.of("broker1", "broker2", "broker3"));
+ "broker1:8080", getLookupData(),
+ "broker2:8080", getLookupData(),
+ "broker3:8080", getLookupData())), namespaceBundle, getContext()).get();
+ assertEquals(result.keySet(), Set.of("broker1:8080", "broker2:8080", "broker3:8080"));
result = filter.filterAsync(new HashMap<>(Map.of(
- "broker1", getLookupData(false, true),
- "broker2", getLookupData(),
- "broker3", getLookupData())), namespaceBundle, getContext()).get();
- assertEquals(result.keySet(), Set.of("broker2", "broker3"));
+ "broker1:8080", getLookupData(false, true),
+ "broker2:8080", getLookupData(),
+ "broker3:8080", getLookupData())), namespaceBundle, getContext()).get();
+ assertEquals(result.keySet(), Set.of("broker2:8080", "broker3:8080"));
}
private void setIsolationPolicies(SimpleResourceAllocationPolicies policies,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index 4eec6124777..0ff64616973 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -100,6 +100,7 @@ import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+import org.assertj.core.api.Assertions;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -168,18 +169,18 @@ public class TransferShedderTest {
var ctx = getContext();
var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
- topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("my-tenant/my-namespaceA", 1000000, 2000000));
- topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("my-tenant/my-namespaceB", 1000000, 3000000));
- topBundlesLoadDataStore.pushAsync("broker3", getTopBundlesLoad("my-tenant/my-namespaceC", 2000000, 4000000));
- topBundlesLoadDataStore.pushAsync("broker4", getTopBundlesLoad("my-tenant/my-namespaceD", 2000000, 6000000));
- topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("my-tenant/my-namespaceE", 2000000, 7000000));
+ topBundlesLoadDataStore.pushAsync("broker1:8080", getTopBundlesLoad("my-tenant/my-namespaceA", 1000000, 2000000));
+ topBundlesLoadDataStore.pushAsync("broker2:8080", getTopBundlesLoad("my-tenant/my-namespaceB", 1000000, 3000000));
+ topBundlesLoadDataStore.pushAsync("broker3:8080", getTopBundlesLoad("my-tenant/my-namespaceC", 2000000, 4000000));
+ topBundlesLoadDataStore.pushAsync("broker4:8080", getTopBundlesLoad("my-tenant/my-namespaceD", 2000000, 6000000));
+ topBundlesLoadDataStore.pushAsync("broker5:8080", getTopBundlesLoad("my-tenant/my-namespaceE", 2000000, 7000000));
var brokerLoadDataStore = ctx.brokerLoadDataStore();
- brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx, 2, "broker1"));
- brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx, 4, "broker2"));
- brokerLoadDataStore.pushAsync("broker3", getCpuLoad(ctx, 6, "broker3"));
- brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx, 80, "broker4"));
- brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx, 90, "broker5"));
+ brokerLoadDataStore.pushAsync("broker1:8080", getCpuLoad(ctx, 2, "broker1:8080"));
+ brokerLoadDataStore.pushAsync("broker2:8080", getCpuLoad(ctx, 4, "broker2:8080"));
+ brokerLoadDataStore.pushAsync("broker3:8080", getCpuLoad(ctx, 6, "broker3:8080"));
+ brokerLoadDataStore.pushAsync("broker4:8080", getCpuLoad(ctx, 80, "broker4:8080"));
+ brokerLoadDataStore.pushAsync("broker5:8080", getCpuLoad(ctx, 90, "broker5:8080"));
return ctx;
}
@@ -192,9 +193,9 @@ public class TransferShedderTest {
Random rand = new Random();
for (int i = 0; i < clusterSize; i++) {
int brokerLoad = rand.nextInt(1000);
- brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx, brokerLoad, "broker" + i));
+ brokerLoadDataStore.pushAsync("broker" + i + ":8080", getCpuLoad(ctx, brokerLoad, "broker" + i + ":8080"));
int bundleLoad = rand.nextInt(brokerLoad + 1);
- topBundlesLoadDataStore.pushAsync("broker" + i, getTopBundlesLoad("my-tenant/my-namespace" + i,
+ topBundlesLoadDataStore.pushAsync("broker" + i + ":8080", getTopBundlesLoad("my-tenant/my-namespace" + i,
bundleLoad, brokerLoad - bundleLoad));
}
return ctx;
@@ -209,14 +210,14 @@ public class TransferShedderTest {
int i = 0;
for (; i < clusterSize-1; i++) {
int brokerLoad = 1;
- topBundlesLoadDataStore.pushAsync("broker" + i, getTopBundlesLoad("my-tenant/my-namespace" + i,
+ topBundlesLoadDataStore.pushAsync("broker" + i + ":8080", getTopBundlesLoad("my-tenant/my-namespace" + i,
300_000, 700_000));
- brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx, brokerLoad, "broker" + i));
+ brokerLoadDataStore.pushAsync("broker" + i + ":8080", getCpuLoad(ctx, brokerLoad, "broker" + i + ":8080"));
}
int brokerLoad = 100;
- topBundlesLoadDataStore.pushAsync("broker" + i, getTopBundlesLoad("my-tenant/my-namespace" + i,
+ topBundlesLoadDataStore.pushAsync("broker" + i + ":8080", getTopBundlesLoad("my-tenant/my-namespace" + i,
30_000_000, 70_000_000));
- brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx, brokerLoad, "broker" + i));
+ brokerLoadDataStore.pushAsync("broker" + i + ":8080", getCpuLoad(ctx, brokerLoad, "broker" + i + ":8080"));
return ctx;
}
@@ -230,21 +231,21 @@ public class TransferShedderTest {
int i = 0;
for (; i < clusterSize-2; i++) {
int brokerLoad = 98;
- topBundlesLoadDataStore.pushAsync("broker" + i, getTopBundlesLoad("my-tenant/my-namespace" + i,
+ topBundlesLoadDataStore.pushAsync("broker" + i + ":8080", getTopBundlesLoad("my-tenant/my-namespace" + i,
30_000_000, 70_000_000));
- brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx, brokerLoad, "broker" + i));
+ brokerLoadDataStore.pushAsync("broker" + i + ":8080", getCpuLoad(ctx, brokerLoad, "broker" + i + ":8080"));
}
int brokerLoad = 99;
- topBundlesLoadDataStore.pushAsync("broker" + i, getTopBundlesLoad("my-tenant/my-namespace" + i,
+ topBundlesLoadDataStore.pushAsync("broker" + i + ":8080", getTopBundlesLoad("my-tenant/my-namespace" + i,
30_000_000, 70_000_000));
- brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx, brokerLoad, "broker" + i));
+ brokerLoadDataStore.pushAsync("broker" + i + ":8080", getCpuLoad(ctx, brokerLoad, "broker" + i + ":8080"));
i++;
brokerLoad = 1;
- topBundlesLoadDataStore.pushAsync("broker" + i, getTopBundlesLoad("my-tenant/my-namespace" + i,
+ topBundlesLoadDataStore.pushAsync("broker" + i + ":8080", getTopBundlesLoad("my-tenant/my-namespace" + i,
300_000, 700_000));
- brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx, brokerLoad, "broker" + i));
+ brokerLoadDataStore.pushAsync("broker" + i + ":8080", getCpuLoad(ctx, brokerLoad, "broker" + i + ":8080"));
return ctx;
}
@@ -474,11 +475,11 @@ public class TransferShedderTest {
BrokerRegistry brokerRegistry = mock(BrokerRegistry.class);
doReturn(CompletableFuture.completedFuture(Map.of(
- "broker1", getMockBrokerLookupData(),
- "broker2", getMockBrokerLookupData(),
- "broker3", getMockBrokerLookupData(),
- "broker4", getMockBrokerLookupData(),
- "broker5", getMockBrokerLookupData()
+ "broker1:8080", getMockBrokerLookupData(),
+ "broker2:8080", getMockBrokerLookupData(),
+ "broker3:8080", getMockBrokerLookupData(),
+ "broker4:8080", getMockBrokerLookupData(),
+ "broker5:8080", getMockBrokerLookupData()
))).when(brokerRegistry).getAvailableBrokerLookupDataAsync();
doReturn(conf).when(ctx).brokerConfiguration();
doReturn(brokerLoadDataStore).when(ctx).brokerLoadDataStore();
@@ -526,11 +527,11 @@ public class TransferShedderTest {
var ctx = getContext();
var brokerLoadDataStore = ctx.brokerLoadDataStore();
- brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx, 2, "broker1"));
- brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx, 4, "broker2"));
- brokerLoadDataStore.pushAsync("broker3", getCpuLoad(ctx, 6, "broker3"));
- brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx, 80, "broker4"));
- brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx, 90, "broker5"));
+ brokerLoadDataStore.pushAsync("broker1:8080", getCpuLoad(ctx, 2, "broker1:8080"));
+ brokerLoadDataStore.pushAsync("broker2:8080", getCpuLoad(ctx, 4, "broker2:8080"));
+ brokerLoadDataStore.pushAsync("broker3:8080", getCpuLoad(ctx, 6, "broker3:8080"));
+ brokerLoadDataStore.pushAsync("broker4:8080", getCpuLoad(ctx, 80, "broker4:8080"));
+ brokerLoadDataStore.pushAsync("broker5:8080", getCpuLoad(ctx, 90, "broker5:8080"));
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
@@ -550,11 +551,11 @@ public class TransferShedderTest {
assertEquals(res.size(), 2);
- FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker1").get(), "updatedAt", 0, true);
- FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker2").get(), "updatedAt", 0, true);
- FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker3").get(), "updatedAt", 0, true);
- FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker4").get(), "updatedAt", 0, true);
- FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker5").get(), "updatedAt", 0, true);
+ FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker1:8080").get(), "updatedAt", 0, true);
+ FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker2:8080").get(), "updatedAt", 0, true);
+ FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker3:8080").get(), "updatedAt", 0, true);
+ FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker4:8080").get(), "updatedAt", 0, true);
+ FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker5:8080").get(), "updatedAt", 0, true);
res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
@@ -571,20 +572,20 @@ public class TransferShedderTest {
Map<String, Long> recentlyUnloadedBrokers = new HashMap<>();
var oldTS = System.currentTimeMillis() - ctx.brokerConfiguration()
.getLoadBalancerBrokerLoadDataTTLInSeconds() * 1001;
- recentlyUnloadedBrokers.put("broker1", oldTS);
+ recentlyUnloadedBrokers.put("broker1:8080", oldTS);
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), recentlyUnloadedBrokers);
var expected = new HashSet<UnloadDecision>();
- expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")),
+ expected.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
Success, Overloaded));
- expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker2")),
+ expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
var now = System.currentTimeMillis();
- recentlyUnloadedBrokers.put("broker1", now);
+ recentlyUnloadedBrokers.put("broker1:8080", now);
res = transferShedder.findBundlesForUnloading(ctx, Map.of(), recentlyUnloadedBrokers);
assertTrue(res.isEmpty());
@@ -604,9 +605,9 @@ public class TransferShedderTest {
recentlyUnloadedBundles.put(bundleD2, now);
var res = transferShedder.findBundlesForUnloading(ctx, recentlyUnloadedBundles, Map.of());
var expected = new HashSet<UnloadDecision>();
- expected.add(new UnloadDecision(new Unload("broker3",
+ expected.add(new UnloadDecision(new Unload("broker3:8080",
"my-tenant/my-namespaceC/0x00000000_0x0FFFFFFF",
- Optional.of("broker1")),
+ Optional.of("broker1:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
@@ -639,13 +640,13 @@ public class TransferShedderTest {
isolationPoliciesHelper, antiAffinityGroupPolicyHelper));
setIsolationPolicies(allocationPoliciesSpy, "my-tenant/my-namespaceE",
- Set.of("broker5"), Set.of(), Set.of(), 1);
+ Set.of("broker5:8080"), Set.of(), Set.of(), 1);
var ctx = setupContext();
ctx.brokerConfiguration().setLoadBalancerSheddingBundlesWithPoliciesEnabled(true);
doReturn(ctx.brokerConfiguration()).when(pulsar).getConfiguration();
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
var expected = new HashSet<UnloadDecision>();
- expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker1")),
+ expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker1:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
@@ -656,7 +657,7 @@ public class TransferShedderTest {
res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
expected = new HashSet<>();
- expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.empty()),
+ expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.empty()),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
@@ -777,7 +778,7 @@ public class TransferShedderTest {
}).when(antiAffinityGroupPolicyHelper).filterAsync(any(), any());
var res2 = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
var expected2 = new HashSet<>();
- expected2.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")),
+ expected2.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
Success, Overloaded));
assertEquals(res2, expected2);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
@@ -853,22 +854,22 @@ public class TransferShedderTest {
var ctx = getContext();
BrokerRegistry brokerRegistry = mock(BrokerRegistry.class);
doReturn(CompletableFuture.completedFuture(Map.of(
- "broker1", mock(BrokerLookupData.class),
- "broker2", mock(BrokerLookupData.class),
- "broker3", mock(BrokerLookupData.class)
+ "broker1:8080", mock(BrokerLookupData.class),
+ "broker2:8080", mock(BrokerLookupData.class),
+ "broker3:8080", mock(BrokerLookupData.class)
))).when(brokerRegistry).getAvailableBrokerLookupDataAsync();
doReturn(brokerRegistry).when(ctx).brokerRegistry();
ctx.brokerConfiguration().setLoadBalancerDebugModeEnabled(true);
var brokerLoadDataStore = ctx.brokerLoadDataStore();
- brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx, 10, "broker1"));
- brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx, 20, "broker2"));
- brokerLoadDataStore.pushAsync("broker3", getCpuLoad(ctx, 30, "broker3"));
+ brokerLoadDataStore.pushAsync("broker1:8080", getCpuLoad(ctx, 10, "broker1:8080"));
+ brokerLoadDataStore.pushAsync("broker2:8080", getCpuLoad(ctx, 20, "broker2:8080"));
+ brokerLoadDataStore.pushAsync("broker3:8080", getCpuLoad(ctx, 30, "broker3:8080"));
var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
- topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("my-tenant/my-namespaceA", 30, 30));
- topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("my-tenant/my-namespaceB", 40, 40));
- topBundlesLoadDataStore.pushAsync("broker3", getTopBundlesLoad("my-tenant/my-namespaceC", 50, 50));
+ topBundlesLoadDataStore.pushAsync("broker1:8080", getTopBundlesLoad("my-tenant/my-namespaceA", 30, 30));
+ topBundlesLoadDataStore.pushAsync("broker2:8080", getTopBundlesLoad("my-tenant/my-namespaceB", 40, 40));
+ topBundlesLoadDataStore.pushAsync("broker3:8080", getTopBundlesLoad("my-tenant/my-namespaceC", 50, 50));
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
@@ -884,11 +885,11 @@ public class TransferShedderTest {
TransferShedder transferShedder = new TransferShedder(counter);
var ctx = setupContext();
var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
- topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("my-tenant/my-namespaceA", 1));
- topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("my-tenant/my-namespaceB", 2));
- topBundlesLoadDataStore.pushAsync("broker3", getTopBundlesLoad("my-tenant/my-namespaceC", 6));
- topBundlesLoadDataStore.pushAsync("broker4", getTopBundlesLoad("my-tenant/my-namespaceD", 10));
- topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("my-tenant/my-namespaceE", 70));
+ topBundlesLoadDataStore.pushAsync("broker1:8080", getTopBundlesLoad("my-tenant/my-namespaceA", 1));
+ topBundlesLoadDataStore.pushAsync("broker2:8080", getTopBundlesLoad("my-tenant/my-namespaceB", 2));
+ topBundlesLoadDataStore.pushAsync("broker3:8080", getTopBundlesLoad("my-tenant/my-namespaceC", 6));
+ topBundlesLoadDataStore.pushAsync("broker4:8080", getTopBundlesLoad("my-tenant/my-namespaceD", 10));
+ topBundlesLoadDataStore.pushAsync("broker5:8080", getTopBundlesLoad("my-tenant/my-namespaceE", 70));
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
assertTrue(res.isEmpty());
@@ -903,14 +904,14 @@ public class TransferShedderTest {
TransferShedder transferShedder = new TransferShedder(counter);
var ctx = setupContext();
var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
- topBundlesLoadDataStore.pushAsync("broker4", getTopBundlesLoad("my-tenant/my-namespaceD", 1000000000, 1000000000));
- topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("my-tenant/my-namespaceE", 1000000000, 1000000000));
+ topBundlesLoadDataStore.pushAsync("broker4:8080", getTopBundlesLoad("my-tenant/my-namespaceD", 1000000000, 1000000000));
+ topBundlesLoadDataStore.pushAsync("broker5:8080", getTopBundlesLoad("my-tenant/my-namespaceE", 1000000000, 1000000000));
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
var expected = new HashSet<UnloadDecision>();
- expected.add(new UnloadDecision(new Unload("broker3",
+ expected.add(new UnloadDecision(new Unload("broker3:8080",
"my-tenant/my-namespaceC/0x00000000_0x0FFFFFFF",
- Optional.of("broker1")),
+ Optional.of("broker1:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
@@ -924,12 +925,12 @@ public class TransferShedderTest {
TransferShedder transferShedder = new TransferShedder(counter);
var ctx = setupContext();
var brokerLoadDataStore = ctx.brokerLoadDataStore();
- brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx, 55, "broker4"));
- brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx, 65, "broker5"));
+ brokerLoadDataStore.pushAsync("broker4:8080", getCpuLoad(ctx, 55, "broker4:8080"));
+ brokerLoadDataStore.pushAsync("broker5:8080", getCpuLoad(ctx, 65, "broker5:8080"));
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
var expected = new HashSet<UnloadDecision>();
- expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")),
+ expected.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), 0.26400000000000007);
@@ -945,43 +946,43 @@ public class TransferShedderTest {
var brokerRegistry = mock(BrokerRegistry.class);
doReturn(brokerRegistry).when(ctx).brokerRegistry();
doReturn(CompletableFuture.completedFuture(Map.of(
- "broker1", mock(BrokerLookupData.class),
- "broker2", mock(BrokerLookupData.class)
+ "broker1:8080", mock(BrokerLookupData.class),
+ "broker2:8080", mock(BrokerLookupData.class)
))).when(brokerRegistry).getAvailableBrokerLookupDataAsync();
var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
- topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("my-tenant/my-namespaceA", 1000000, 2000000, 3000000, 4000000, 5000000));
- topBundlesLoadDataStore.pushAsync("broker2",
+ topBundlesLoadDataStore.pushAsync("broker1:8080", getTopBundlesLoad("my-tenant/my-namespaceA", 1000000, 2000000, 3000000, 4000000, 5000000));
+ topBundlesLoadDataStore.pushAsync("broker2:8080",
getTopBundlesLoad("my-tenant/my-namespaceB", 100000000, 180000000, 220000000, 250000000, 250000000));
var brokerLoadDataStore = ctx.brokerLoadDataStore();
- brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx, 10, "broker1"));
- brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx, 1000, "broker2"));
+ brokerLoadDataStore.pushAsync("broker1:8080", getCpuLoad(ctx, 10, "broker1:8080"));
+ brokerLoadDataStore.pushAsync("broker2:8080", getCpuLoad(ctx, 1000, "broker2:8080"));
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
var expected = new HashSet<UnloadDecision>();
expected.add(new UnloadDecision(
- new Unload("broker2", "my-tenant/my-namespaceB/0x00000000_0x1FFFFFFF", Optional.of("broker1")),
+ new Unload("broker2:8080", "my-tenant/my-namespaceB/0x00000000_0x1FFFFFFF", Optional.of("broker1:8080")),
Success, Overloaded));
expected.add(new UnloadDecision(
- new Unload("broker2", "my-tenant/my-namespaceB/0x1FFFFFFF_0x2FFFFFFF", Optional.of("broker1")),
+ new Unload("broker2:8080", "my-tenant/my-namespaceB/0x1FFFFFFF_0x2FFFFFFF", Optional.of("broker1:8080")),
Success, Overloaded));
expected.add(new UnloadDecision(
- new Unload("broker2", "my-tenant/my-namespaceB/0x2FFFFFFF_0x3FFFFFFF", Optional.of("broker1")),
+ new Unload("broker2:8080", "my-tenant/my-namespaceB/0x2FFFFFFF_0x3FFFFFFF", Optional.of("broker1:8080")),
Success, Overloaded));
expected.add(new UnloadDecision(
- new Unload("broker1", "my-tenant/my-namespaceA/0x00000000_0x1FFFFFFF", Optional.of("broker2")),
+ new Unload("broker1:8080", "my-tenant/my-namespaceA/0x00000000_0x1FFFFFFF", Optional.of("broker2:8080")),
Success, Overloaded));
expected.add(new UnloadDecision(
- new Unload("broker1", "my-tenant/my-namespaceA/0x1FFFFFFF_0x2FFFFFFF", Optional.of("broker2")),
+ new Unload("broker1:8080", "my-tenant/my-namespaceA/0x1FFFFFFF_0x2FFFFFFF", Optional.of("broker2:8080")),
Success, Overloaded));
expected.add(new UnloadDecision(
- new Unload("broker1","my-tenant/my-namespaceA/0x2FFFFFFF_0x3FFFFFFF", Optional.of("broker2")),
+ new Unload("broker1:8080","my-tenant/my-namespaceA/0x2FFFFFFF_0x3FFFFFFF", Optional.of("broker2:8080")),
Success, Overloaded));
expected.add(new UnloadDecision(
- new Unload("broker1","my-tenant/my-namespaceA/0x3FFFFFFF_0x4FFFFFFF", Optional.of("broker2")),
+ new Unload("broker1:8080","my-tenant/my-namespaceA/0x3FFFFFFF_0x4FFFFFFF", Optional.of("broker2:8080")),
Success, Overloaded));
assertEquals(counter.getLoadAvg(), 5.05);
assertEquals(counter.getLoadStd(), 4.95);
@@ -1001,20 +1002,20 @@ public class TransferShedderTest {
var brokerRegistry = mock(BrokerRegistry.class);
doReturn(brokerRegistry).when(ctx).brokerRegistry();
doReturn(CompletableFuture.completedFuture(Map.of(
- "broker1", mock(BrokerLookupData.class),
- "broker2", mock(BrokerLookupData.class)
+ "broker1:8080", mock(BrokerLookupData.class),
+ "broker2:8080", mock(BrokerLookupData.class)
))).when(brokerRegistry).getAvailableBrokerLookupDataAsync();
var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
- topBundlesLoadDataStore.pushAsync("broker1",
+ topBundlesLoadDataStore.pushAsync("broker1:8080",
getTopBundlesLoad("my-tenant/my-namespaceA", 1, 500000000));
- topBundlesLoadDataStore.pushAsync("broker2",
+ topBundlesLoadDataStore.pushAsync("broker2:8080",
getTopBundlesLoad("my-tenant/my-namespaceB", 500000000, 500000000));
var brokerLoadDataStore = ctx.brokerLoadDataStore();
- brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx, 50, "broker1"));
- brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx, 100, "broker2"));
+ brokerLoadDataStore.pushAsync("broker1:8080", getCpuLoad(ctx, 50, "broker1:8080"));
+ brokerLoadDataStore.pushAsync("broker2:8080", getCpuLoad(ctx, 100, "broker2:8080"));
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
@@ -1032,24 +1033,24 @@ public class TransferShedderTest {
var brokerRegistry = mock(BrokerRegistry.class);
doReturn(brokerRegistry).when(ctx).brokerRegistry();
doReturn(CompletableFuture.completedFuture(Map.of(
- "broker1", mock(BrokerLookupData.class),
- "broker2", mock(BrokerLookupData.class)
+ "broker1:8080", mock(BrokerLookupData.class),
+ "broker2:8080", mock(BrokerLookupData.class)
))).when(brokerRegistry).getAvailableBrokerLookupDataAsync();
var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
- topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("my-tenant/my-namespaceA", 1000000, 2000000, 3000000, 4000000, 5000000));
- topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("my-tenant/my-namespaceB", 490000000, 510000000));
+ topBundlesLoadDataStore.pushAsync("broker1:8080", getTopBundlesLoad("my-tenant/my-namespaceA", 1000000, 2000000, 3000000, 4000000, 5000000));
+ topBundlesLoadDataStore.pushAsync("broker2:8080", getTopBundlesLoad("my-tenant/my-namespaceB", 490000000, 510000000));
var brokerLoadDataStore = ctx.brokerLoadDataStore();
- brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx, 10, "broker1"));
- brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx, 1000, "broker2"));
+ brokerLoadDataStore.pushAsync("broker1:8080", getCpuLoad(ctx, 10, "broker1:8080"));
+ brokerLoadDataStore.pushAsync("broker2:8080", getCpuLoad(ctx, 1000, "broker2:8080"));
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
var expected = new HashSet<UnloadDecision>();
expected.add(new UnloadDecision(
- new Unload("broker2", "my-tenant/my-namespaceB/0x00000000_0x0FFFFFFF", Optional.of("broker1")),
+ new Unload("broker2:8080", "my-tenant/my-namespaceB/0x00000000_0x0FFFFFFF", Optional.of("broker1:8080")),
Success, Overloaded));
assertEquals(counter.getLoadAvg(), 5.05);
assertEquals(counter.getLoadStd(), 4.95);
@@ -1070,30 +1071,30 @@ public class TransferShedderTest {
var brokerRegistry = mock(BrokerRegistry.class);
doReturn(brokerRegistry).when(ctx).brokerRegistry();
doReturn(CompletableFuture.completedFuture(Map.of(
- "broker1", mock(BrokerLookupData.class),
- "broker2", mock(BrokerLookupData.class)
+ "broker1:8080", mock(BrokerLookupData.class),
+ "broker2:8080", mock(BrokerLookupData.class)
))).when(brokerRegistry).getAvailableBrokerLookupDataAsync();
var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
- topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("my-tenant/my-namespaceA", 2400000, 2400000));
- topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("my-tenant/my-namespaceB", 5000000, 5000000));
+ topBundlesLoadDataStore.pushAsync("broker1:8080", getTopBundlesLoad("my-tenant/my-namespaceA", 2400000, 2400000));
+ topBundlesLoadDataStore.pushAsync("broker2:8080", getTopBundlesLoad("my-tenant/my-namespaceB", 5000000, 5000000));
var brokerLoadDataStore = ctx.brokerLoadDataStore();
- brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx, 48, "broker1"));
- brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx, 100, "broker2"));
+ brokerLoadDataStore.pushAsync("broker1:8080", getCpuLoad(ctx, 48, "broker1:8080"));
+ brokerLoadDataStore.pushAsync("broker2:8080", getCpuLoad(ctx, 100, "broker2:8080"));
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
var expected = new HashSet<UnloadDecision>();
expected.add(new UnloadDecision(
- new Unload("broker1",
- res.stream().filter(x -> x.getUnload().sourceBroker().equals("broker1")).findFirst().get()
- .getUnload().serviceUnit(), Optional.of("broker2")),
+ new Unload("broker1:8080",
+ res.stream().filter(x -> x.getUnload().sourceBroker().equals("broker1:8080")).findFirst().get()
+ .getUnload().serviceUnit(), Optional.of("broker2:8080")),
Success, Overloaded));
expected.add(new UnloadDecision(
- new Unload("broker2",
- res.stream().filter(x -> x.getUnload().sourceBroker().equals("broker2")).findFirst().get()
- .getUnload().serviceUnit(), Optional.of("broker1")),
+ new Unload("broker2:8080",
+ res.stream().filter(x -> x.getUnload().sourceBroker().equals("broker2:8080")).findFirst().get()
+ .getUnload().serviceUnit(), Optional.of("broker1:8080")),
Success, Overloaded));
assertEquals(counter.getLoadAvg(), 0.74);
assertEquals(counter.getLoadStd(), 0.26);
@@ -1111,18 +1112,18 @@ public class TransferShedderTest {
var ctx = setupContext();
var brokerLoadDataStore = ctx.brokerLoadDataStore();
- var load = getCpuLoad(ctx, 4, "broker2");
+ var load = getCpuLoad(ctx, 4, "broker2:8080");
FieldUtils.writeDeclaredField(load,"msgThroughputEMA", 0, true);
- brokerLoadDataStore.pushAsync("broker2", load);
- brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx, 55, "broker4"));
- brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx, 65, "broker5"));
+ brokerLoadDataStore.pushAsync("broker2:8080", load);
+ brokerLoadDataStore.pushAsync("broker4:8080", getCpuLoad(ctx, 55, "broker4:8080"));
+ brokerLoadDataStore.pushAsync("broker5:8080", getCpuLoad(ctx, 65, "broker5:8080"));
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
var expected = new HashSet<UnloadDecision>();
- expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")),
+ expected.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
Success, Overloaded));
- expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker2")),
+ expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
Success, Underloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), 0.26400000000000007);
@@ -1136,17 +1137,17 @@ public class TransferShedderTest {
var ctx = setupContext();
var brokerLoadDataStore = ctx.brokerLoadDataStore();
- var load = getCpuLoad(ctx, 3 , "broker2");
- brokerLoadDataStore.pushAsync("broker2", load);
- brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx, 55, "broker4"));
- brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx, 65, "broker5"));
+ var load = getCpuLoad(ctx, 3 , "broker2:8080");
+ brokerLoadDataStore.pushAsync("broker2:8080", load);
+ brokerLoadDataStore.pushAsync("broker4:8080", getCpuLoad(ctx, 55, "broker4:8080"));
+ brokerLoadDataStore.pushAsync("broker5:8080", getCpuLoad(ctx, 65, "broker5:8080"));
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
var expected = new HashSet<UnloadDecision>();
- expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")),
+ expected.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
Success, Overloaded));
- expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker2")),
+ expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
Success, Underloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), 0.262);
@@ -1162,9 +1163,9 @@ public class TransferShedderTest {
.setLoadBalancerMaxNumberOfBrokerSheddingPerCycle(10);
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
var expected = new HashSet<UnloadDecision>();
- expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")),
+ expected.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
Success, Overloaded));
- expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker2")),
+ expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
@@ -1188,9 +1189,9 @@ public class TransferShedderTest {
}
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
var expected = new HashSet<UnloadDecision>();
- expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")),
+ expected.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
Success, Overloaded));
- expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker2")),
+ expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
@@ -1203,13 +1204,13 @@ public class TransferShedderTest {
TransferShedder transferShedder = new TransferShedder(counter);
var ctx = setupContext();
var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
- topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("my-tenant/my-namespaceE", 2000000, 3000000));
+ topBundlesLoadDataStore.pushAsync("broker5:8080", getTopBundlesLoad("my-tenant/my-namespaceE", 2000000, 3000000));
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
var expected = new HashSet<UnloadDecision>();
- expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")),
+ expected.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
Success, Overloaded));
- expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker2")),
+ expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
@@ -1223,14 +1224,14 @@ public class TransferShedderTest {
var ctx = setupContext();
var brokerLoadDataStore = ctx.brokerLoadDataStore();
- brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx, 200, "broker4"));
- brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx, 1000, "broker5"));
+ brokerLoadDataStore.pushAsync("broker4:8080", getCpuLoad(ctx, 200, "broker4:8080"));
+ brokerLoadDataStore.pushAsync("broker5:8080", getCpuLoad(ctx, 1000, "broker5:8080"));
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
var expected = new HashSet<UnloadDecision>();
- expected.add(new UnloadDecision(new Unload("broker5", bundleE1, Optional.of("broker1")),
+ expected.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
Success, Overloaded));
- expected.add(new UnloadDecision(new Unload("broker4", bundleD1, Optional.of("broker2")),
+ expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), 2.4240000000000004);
@@ -1264,13 +1265,16 @@ public class TransferShedderTest {
TransferShedder transferShedder = new TransferShedder(counter);
var ctx = setupContextLoadSkewedOverload(100);
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
- var expected = new HashSet<UnloadDecision>();
- expected.add(new UnloadDecision(
- new Unload("broker99", "my-tenant/my-namespace99/0x00000000_0x0FFFFFFF",
- Optional.of("broker52")), Success, Overloaded));
- assertEquals(res, expected);
- assertEquals(counter.getLoadAvg(), 0.019900000000000008);
- assertEquals(counter.getLoadStd(), 0.09850375627355534);
+ Assertions.assertThat(res).isIn(
+ Set.of(new UnloadDecision(
+ new Unload("broker99:8080", "my-tenant/my-namespace99/0x00000000_0x0FFFFFFF",
+ Optional.of("broker52:8080")), Success, Overloaded)),
+ Set.of(new UnloadDecision(
+ new Unload("broker99:8080", "my-tenant/my-namespace99/0x00000000_0x0FFFFFFF",
+ Optional.of("broker83:8080")), Success, Overloaded))
+ );
+ assertEquals(counter.getLoadAvg(), 0.019900000000000008, 0.00001);
+ assertEquals(counter.getLoadStd(), 0.09850375627355534, 0.00001);
}
@Test
@@ -1281,11 +1285,11 @@ public class TransferShedderTest {
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
var expected = new HashSet<UnloadDecision>();
expected.add(new UnloadDecision(
- new Unload("broker98", "my-tenant/my-namespace98/0x00000000_0x0FFFFFFF",
- Optional.of("broker99")), Success, Underloaded));
+ new Unload("broker98:8080", "my-tenant/my-namespace98/0x00000000_0x0FFFFFFF",
+ Optional.of("broker99:8080")), Success, Underloaded));
assertEquals(res, expected);
- assertEquals(counter.getLoadAvg(), 0.9704000000000005);
- assertEquals(counter.getLoadStd(), 0.09652895938523735);
+ assertEquals(counter.getLoadAvg(), 0.9704000000000005, 0.00001);
+ assertEquals(counter.getLoadStd(), 0.09652895938523735, 0.00001);
}
@Test
@@ -1301,13 +1305,13 @@ public class TransferShedderTest {
double[] loads = new double[numBrokers];
final Map<String, BrokerLookupData> availableBrokers = new HashMap<>();
for (int i = 0; i < loads.length; i++) {
- availableBrokers.put("broker" + i, mock(BrokerLookupData.class));
+ availableBrokers.put("broker" + i + ":8080", mock(BrokerLookupData.class));
}
stats.update(loadStore, availableBrokers, Map.of(), conf);
var brokerLoadDataStore = ctx.brokerLoadDataStore();
for (int i = 0; i < loads.length; i++) {
- loads[i] = loadStore.get("broker" + i).get().getWeightedMaxEMA();
+ loads[i] = loadStore.get("broker" + i + ":8080").get().getWeightedMaxEMA();
}
int i = 0;
int j = loads.length - 1;
@@ -1342,8 +1346,8 @@ public class TransferShedderTest {
var conf = ctx.brokerConfiguration();
final Map<String, BrokerLookupData> availableBrokers = new HashMap<>();
for (int i = 0; i < loads.length; i++) {
- availableBrokers.put("broker" + i, mock(BrokerLookupData.class));
- loadStore.pushAsync("broker" + i, getCpuLoad(ctx, loads[i], "broker" + i));
+ availableBrokers.put("broker" + i + ":8080", mock(BrokerLookupData.class));
+ loadStore.pushAsync("broker" + i + ":8080", getCpuLoad(ctx, loads[i], "broker" + i + ":8080"));
}
stats.update(loadStore, availableBrokers, Map.of(), conf);
@@ -1361,8 +1365,8 @@ public class TransferShedderTest {
var conf = ctx.brokerConfiguration();
final Map<String, BrokerLookupData> availableBrokers = new HashMap<>();
for (int i = 0; i < loads.length; i++) {
- availableBrokers.put("broker" + i, mock(BrokerLookupData.class));
- loadStore.pushAsync("broker" + i, getCpuLoad(ctx, loads[i], "broker" + i));
+ availableBrokers.put("broker" + i + ":8080", mock(BrokerLookupData.class));
+ loadStore.pushAsync("broker" + i + ":8080", getCpuLoad(ctx, loads[i], "broker" + i + ":8080"));
}
stats.update(loadStore, availableBrokers, Map.of(), conf);
assertEquals(stats.avg(), 3.9449999999999994);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index d48a56491b8..b924a59bf7d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -23,7 +23,6 @@ import static org.apache.pulsar.broker.resources.LoadBalanceResources.BROKER_TIM
import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -31,6 +30,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
@@ -84,16 +84,17 @@ import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
-import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
-import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -120,12 +121,9 @@ public class ModularLoadManagerImplTest {
private PulsarService pulsar3;
- private String primaryHost;
+ private String primaryBrokerId;
- private String primaryTlsHost;
- private String secondaryHost;
-
- private String secondaryTlsHost;
+ private String secondaryBrokerId;
private NamespaceBundleFactory nsFactory;
@@ -183,8 +181,7 @@ public class ModularLoadManagerImplTest {
pulsar1 = new PulsarService(config1);
pulsar1.start();
- primaryHost = String.format("%s:%d", "localhost", pulsar1.getListenPortHTTP().get());
- primaryTlsHost = String.format("%s:%d", "localhost", pulsar1.getListenPortHTTPS().get());
+ primaryBrokerId = String.format("%s:%d", "localhost", pulsar1.getListenPortHTTPS().get());
url1 = new URL(pulsar1.getWebServiceAddress());
admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
@@ -218,8 +215,7 @@ public class ModularLoadManagerImplTest {
config.setBrokerServicePortTls(Optional.of(0));
pulsar3 = new PulsarService(config);
- secondaryHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTP().get());
- secondaryTlsHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTPS().get());
+ secondaryBrokerId = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTPS().get());
url2 = new URL(pulsar2.getWebServiceAddress());
admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();
@@ -239,7 +235,7 @@ public class ModularLoadManagerImplTest {
pulsar2.close();
pulsar1.close();
-
+
if (pulsar3.isRunning()) {
pulsar3.close();
}
@@ -270,7 +266,7 @@ public class ModularLoadManagerImplTest {
for (int i = 0; i < 2; ++i) {
final ServiceUnitId serviceUnit = makeBundle(Integer.toString(i));
final String broker = primaryLoadManager.selectBrokerForAssignment(serviceUnit).get();
- if (broker.equals(primaryHost)) {
+ if (broker.equals(primaryBrokerId)) {
foundFirst = true;
} else {
foundSecond = true;
@@ -286,12 +282,12 @@ public class ModularLoadManagerImplTest {
LoadData loadData = (LoadData) getField(primaryLoadManager, "loadData");
// Make sure the second broker is not in the internal map.
- Awaitility.await().untilAsserted(() -> assertFalse(loadData.getBrokerData().containsKey(secondaryHost)));
+ Awaitility.await().untilAsserted(() -> assertFalse(loadData.getBrokerData().containsKey(secondaryBrokerId)));
// Try 5 more selections, ensure they all go to the first broker.
for (int i = 2; i < 7; ++i) {
final ServiceUnitId serviceUnit = makeBundle(Integer.toString(i));
- assertEquals(primaryLoadManager.selectBrokerForAssignment(serviceUnit), primaryHost);
+ assertEquals(primaryLoadManager.selectBrokerForAssignment(serviceUnit), primaryBrokerId);
}
}
@@ -313,7 +309,7 @@ public class ModularLoadManagerImplTest {
// one bundle.
pulsar1.getLocalMetadataStore().getMetadataCache(BundleData.class).create(firstBundleDataPath, bundleData).join();
for (final NamespaceBundle bundle : bundles) {
- if (primaryLoadManager.selectBrokerForAssignment(bundle).equals(primaryHost)) {
+ if (primaryLoadManager.selectBrokerForAssignment(bundle).equals(primaryBrokerId)) {
++numAssignedToPrimary;
} else {
++numAssignedToSecondary;
@@ -327,52 +323,52 @@ public class ModularLoadManagerImplTest {
}
-
+
@Test
public void testBrokerAffinity() throws Exception {
// Start broker 3
pulsar3.start();
-
+
final String tenant = "test";
final String cluster = "test";
String namespace = tenant + "/" + cluster + "/" + "test";
String topic = "persistent://" + namespace + "/my-topic1";
- admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
+ admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress()).build());
admin1.tenants().createTenant(tenant,
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(namespace, 16);
-
+
String topicLookup = admin1.lookups().lookupTopic(topic);
String bundleRange = admin1.lookups().getBundleRange(topic);
-
+
String brokerServiceUrl = pulsar1.getBrokerServiceUrl();
- String brokerUrl = pulsar1.getSafeWebServiceAddress();
+ String brokerId = pulsar1.getBrokerId();
log.debug("initial broker service url - {}", topicLookup);
Random rand=new Random();
-
+
if (topicLookup.equals(brokerServiceUrl)) {
int x = rand.nextInt(2);
if (x == 0) {
- brokerUrl = pulsar2.getSafeWebServiceAddress();
+ brokerId = pulsar2.getBrokerId();
brokerServiceUrl = pulsar2.getBrokerServiceUrl();
}
else {
- brokerUrl = pulsar3.getSafeWebServiceAddress();
+ brokerId = pulsar3.getBrokerId();
brokerServiceUrl = pulsar3.getBrokerServiceUrl();
}
}
- brokerUrl = brokerUrl.replaceFirst("http[s]?://", "");
- log.debug("destination broker service url - {}, broker url - {}", brokerServiceUrl, brokerUrl);
- String leaderServiceUrl = admin1.brokers().getLeaderBroker().getServiceUrl();
- log.debug("leader serviceUrl - {}, broker1 service url - {}", leaderServiceUrl, pulsar1.getSafeWebServiceAddress());
- //Make a call to broker which is not a leader
- if (!leaderServiceUrl.equals(pulsar1.getSafeWebServiceAddress())) {
- admin1.namespaces().unloadNamespaceBundle(namespace, bundleRange, brokerUrl);
+ log.debug("destination broker service url - {}, broker url - {}", brokerServiceUrl, brokerId);
+ String leaderBrokerId = admin1.brokers().getLeaderBroker().getBrokerId();
+ log.debug("leader lookup address - {}, broker1 lookup address - {}", leaderBrokerId,
+ pulsar1.getBrokerId());
+ // Make a call to broker which is not a leader
+ if (!leaderBrokerId.equals(pulsar1.getBrokerId())) {
+ admin1.namespaces().unloadNamespaceBundle(namespace, bundleRange, brokerId);
}
else {
- admin2.namespaces().unloadNamespaceBundle(namespace, bundleRange, brokerUrl);
+ admin2.namespaces().unloadNamespaceBundle(namespace, bundleRange, brokerId);
}
-
+
sleep(2000);
String topicLookupAfterUnload = admin1.lookups().lookupTopic(topic);
log.debug("final broker service url - {}", topicLookupAfterUnload);
@@ -444,9 +440,9 @@ public class ModularLoadManagerImplTest {
pulsar1.getConfiguration().setLoadBalancerEnabled(true);
final LoadData loadData = (LoadData) getField(primaryLoadManagerSpy, "loadData");
final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
- final BrokerData brokerDataSpy1 = spy(brokerDataMap.get(primaryTlsHost));
+ final BrokerData brokerDataSpy1 = spy(brokerDataMap.get(primaryBrokerId));
when(brokerDataSpy1.getLocalData()).thenReturn(localBrokerData);
- brokerDataMap.put(primaryTlsHost, brokerDataSpy1);
+ brokerDataMap.put(primaryBrokerId, brokerDataSpy1);
// Need to update all the bundle data for the shredder to see the spy.
primaryLoadManagerSpy.handleDataNotification(new Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + "/broker:8080"));
@@ -464,7 +460,7 @@ public class ModularLoadManagerImplTest {
verify(namespacesSpy1, Mockito.times(1))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
assertEquals(bundleReference.get(), mockBundleName(2));
- assertEquals(selectedBrokerRef.get().get(), secondaryTlsHost);
+ assertEquals(selectedBrokerRef.get().get(), secondaryBrokerId);
primaryLoadManagerSpy.doLoadShedding();
// Now less expensive bundle will be unloaded (normally other bundle would move off and nothing would be
@@ -472,13 +468,13 @@ public class ModularLoadManagerImplTest {
verify(namespacesSpy1, Mockito.times(2))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
assertEquals(bundleReference.get(), mockBundleName(1));
- assertEquals(selectedBrokerRef.get().get(), secondaryTlsHost);
+ assertEquals(selectedBrokerRef.get().get(), secondaryBrokerId);
primaryLoadManagerSpy.doLoadShedding();
// Now both are in grace period: neither should be unloaded.
verify(namespacesSpy1, Mockito.times(2))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
- assertEquals(selectedBrokerRef.get().get(), secondaryTlsHost);
+ assertEquals(selectedBrokerRef.get().get(), secondaryBrokerId);
// Test bundle transfer to same broker
@@ -487,13 +483,11 @@ public class ModularLoadManagerImplTest {
verify(namespacesSpy1, Mockito.times(3))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
- doReturn(Optional.of(primaryHost)).when(primaryLoadManagerSpy).selectBroker(any());
loadData.getRecentlyUnloadedBundles().clear();
primaryLoadManagerSpy.doLoadShedding();
// The bundle shouldn't be unloaded because the broker is the same.
verify(namespacesSpy1, Mockito.times(4))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
-
}
// Test that ModularLoadManagerImpl will determine that writing local data to ZooKeeper is necessary if certain
@@ -610,10 +604,12 @@ public class ModularLoadManagerImplTest {
final String tenant = "my-property";
final String cluster = "use";
final String namespace = "my-ns";
- final String broker1Address = pulsar1.getAdvertisedAddress() + "0";
- final String broker2Address = pulsar2.getAdvertisedAddress() + "1";
- final String sharedBroker = "broker3";
- admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
+ String broker1Host = pulsar1.getAdvertisedAddress() + "0";
+ final String broker1Address = broker1Host + ":8080";
+ String broker2Host = pulsar2.getAdvertisedAddress() + "1";
+ final String broker2Address = broker2Host + ":8080";
+ final String sharedBroker = "broker3:8080";
+ admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress()).build());
admin1.tenants().createTenant(tenant,
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(tenant + "/" + cluster + "/" + namespace);
@@ -621,8 +617,8 @@ public class ModularLoadManagerImplTest {
// set a new policy
String newPolicyJsonTemplate = "{\"namespaces\":[\"%s/%s/%s.*\"],\"primary\":[\"%s\"],"
+ "\"secondary\":[\"%s\"],\"auto_failover_policy\":{\"policy_type\":\"min_available\",\"parameters\":{\"min_limit\":%s,\"usage_threshold\":80}}}";
- String newPolicyJson = String.format(newPolicyJsonTemplate, tenant, cluster, namespace, broker1Address,
- broker2Address, 1);
+ String newPolicyJson = String.format(newPolicyJsonTemplate, tenant, cluster, namespace, broker1Host,
+ broker2Host, 1);
String newPolicyName = "my-ns-isolation-policies";
ObjectMapper jsonMapper = ObjectMapperFactory.create();
NamespaceIsolationDataImpl nsPolicyData = jsonMapper.readValue(newPolicyJson.getBytes(),
@@ -634,12 +630,12 @@ public class ModularLoadManagerImplTest {
ServiceUnitId serviceUnit = LoadBalancerTestingUtils.makeBundles(nsFactory, tenant, cluster, namespace, 1)[0];
BrokerTopicLoadingPredicate brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
- public boolean isEnablePersistentTopics(String brokerUrl) {
+ public boolean isEnablePersistentTopics(String brokerId) {
return true;
}
@Override
- public boolean isEnableNonPersistentTopics(String brokerUrl) {
+ public boolean isEnableNonPersistentTopics(String brokerId) {
return true;
}
};
@@ -671,8 +667,8 @@ public class ModularLoadManagerImplTest {
// (2) now we will have isolation policy : primary=broker1, secondary=broker2, minLimit=2
- newPolicyJson = String.format(newPolicyJsonTemplate, tenant, cluster, namespace, broker1Address,
- broker2Address, 2);
+ newPolicyJson = String.format(newPolicyJsonTemplate, tenant, cluster, namespace, broker1Host,
+ broker2Host, 2);
nsPolicyData = jsonMapper.readValue(newPolicyJson.getBytes(), NamespaceIsolationDataImpl.class);
admin1.clusters().createNamespaceIsolationPolicy("use", newPolicyName, nsPolicyData);
@@ -709,10 +705,12 @@ public class ModularLoadManagerImplTest {
final String tenant = "my-tenant";
final String namespace = "my-tenant/use/my-ns";
final String bundle = "0x00000000_0xffffffff";
- final String brokerAddress = pulsar1.getAdvertisedAddress();
- final String broker1Address = pulsar1.getAdvertisedAddress() + 1;
+ final String brokerHost = pulsar1.getAdvertisedAddress();
+ final String brokerAddress = brokerHost + ":8080";
+ final String broker1Host = pulsar1.getAdvertisedAddress() + "1";
+ final String broker1Address = broker1Host + ":8080";
- admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
+ admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress()).build());
admin1.tenants().createTenant(tenant,
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(namespace);
@@ -727,12 +725,13 @@ public class ModularLoadManagerImplTest {
loadManager.updateAll();
// test1: no isolation policy
- assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace, bundle, primaryHost));
+ assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace, bundle, primaryBrokerId));
// test2: as isolation policy, there are not another broker to load the bundle.
String newPolicyJsonTemplate = "{\"namespaces\":[\"%s.*\"],\"primary\":[\"%s\"],"
+ "\"secondary\":[\"%s\"],\"auto_failover_policy\":{\"policy_type\":\"min_available\",\"parameters\":{\"min_limit\":%s,\"usage_threshold\":80}}}";
- String newPolicyJson = String.format(newPolicyJsonTemplate, namespace, broker1Address,broker1Address, 1);
+
+ String newPolicyJson = String.format(newPolicyJsonTemplate, namespace, broker1Host, broker1Host, 1);
String newPolicyName = "my-ns-isolation-policies";
ObjectMapper jsonMapper = ObjectMapperFactory.create();
NamespaceIsolationDataImpl nsPolicyData = jsonMapper.readValue(newPolicyJson.getBytes(),
@@ -741,11 +740,11 @@ public class ModularLoadManagerImplTest {
assertFalse(loadManager.shouldNamespacePoliciesUnload(namespace, bundle, broker1Address));
// test3: as isolation policy, there are another can load the bundle.
- String newPolicyJson1 = String.format(newPolicyJsonTemplate, namespace, brokerAddress,brokerAddress, 1);
+ String newPolicyJson1 = String.format(newPolicyJsonTemplate, namespace, brokerHost, brokerHost, 1);
NamespaceIsolationDataImpl nsPolicyData1 = jsonMapper.readValue(newPolicyJson1.getBytes(),
NamespaceIsolationDataImpl.class);
admin1.clusters().updateNamespaceIsolationPolicy(cluster, newPolicyName, nsPolicyData1);
- assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace, bundle, primaryHost));
+ assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace, bundle, primaryBrokerId));
producer.close();
}
@@ -762,7 +761,7 @@ public class ModularLoadManagerImplTest {
ServiceConfiguration config = new ServiceConfiguration();
config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config.setClusterName("use");
- config.setWebServicePort(Optional.of(0));
+ config.setWebServicePort(Optional.of(PortManager.nextLockedFreePort()));
config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@@ -770,10 +769,12 @@ public class ModularLoadManagerImplTest {
PulsarService pulsar = new PulsarService(config);
// create znode using different zk-session
final String brokerZnode = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getAdvertisedAddress() + ":"
- + config.getWebServicePort();
- pulsar1.getLocalMetadataStore().put(brokerZnode, new byte[0], Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
+ + config.getWebServicePort().get();
+ pulsar1.getLocalMetadataStore()
+ .put(brokerZnode, new byte[0], Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
try {
pulsar.start();
+ fail("should have failed");
} catch (PulsarServerException e) {
//Ok.
}
@@ -812,7 +813,7 @@ public class ModularLoadManagerImplTest {
final String tenant = "my-tenant";
final String namespace = "my-ns";
NamespaceName ns = isV1 ? NamespaceName.get(tenant, cluster, namespace) : NamespaceName.get(tenant, namespace);
- admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
+ admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress()).build());
admin1.tenants().createTenant(tenant,
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(ns.toString(), 16);
@@ -861,7 +862,7 @@ public class ModularLoadManagerImplTest {
final String topicName = tenant + "/" + namespace + "/" + "topic";
int bundleNumbers = 8;
- admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
+ admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress()).build());
admin1.tenants().createTenant(tenant,
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(tenant + "/" + namespace, bundleNumbers);
@@ -911,7 +912,6 @@ public class ModularLoadManagerImplTest {
final Optional<ResourceUnit> leastLoaded = loadManagerWrapper.getLeastLoaded(bundleWillBeSplit);
assertFalse(leastLoaded.isEmpty());
- assertTrue(leastLoaded.get().getResourceId().startsWith("https"));
String bundleDataPath = BUNDLE_DATA_BASE_PATH + "/" + tenant + "/" + namespace;
CompletableFuture<List<String>> children = bundlesCache.getChildren(bundleDataPath);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index c22e49e5fea..a0313ef7436 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -353,14 +353,14 @@ public class NamespaceServiceTest extends BrokerTestBase {
@Test
public void testLoadReportDeserialize() throws Exception {
- final String candidateBroker1 = "http://localhost:8000";
- final String candidateBroker2 = "http://localhost:3000";
- LoadReport lr = new LoadReport(null, null, candidateBroker1, null);
- LocalBrokerData ld = new LocalBrokerData(null, null, candidateBroker2, null);
- URI uri1 = new URI(candidateBroker1);
- URI uri2 = new URI(candidateBroker2);
- String path1 = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri1.getHost(), uri1.getPort());
- String path2 = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri2.getHost(), uri2.getPort());
+ final String candidateBroker1 = "localhost:8000";
+ String broker1Url = "pulsar://localhost:6650";
+ final String candidateBroker2 = "localhost:3000";
+ String broker2Url = "pulsar://localhost:6660";
+ LoadReport lr = new LoadReport("http://" + candidateBroker1, null, broker1Url, null);
+ LocalBrokerData ld = new LocalBrokerData("http://" + candidateBroker2, null, broker2Url, null);
+ String path1 = String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, candidateBroker1);
+ String path2 = String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, candidateBroker2);
pulsar.getLocalMetadataStore().put(path1,
ObjectMapperFactory.getMapper().writer().writeValueAsBytes(lr),
@@ -379,23 +379,23 @@ public class NamespaceServiceTest extends BrokerTestBase {
.getAndSet(new ModularLoadManagerWrapper(new ModularLoadManagerImpl()));
oldLoadManager.stop();
LookupResult result2 = pulsar.getNamespaceService().createLookupResult(candidateBroker2, false, null).get();
- Assert.assertEquals(result1.getLookupData().getBrokerUrl(), candidateBroker1);
- Assert.assertEquals(result2.getLookupData().getBrokerUrl(), candidateBroker2);
+ Assert.assertEquals(result1.getLookupData().getBrokerUrl(), broker1Url);
+ Assert.assertEquals(result2.getLookupData().getBrokerUrl(), broker2Url);
System.out.println(result2);
}
@Test
public void testCreateLookupResult() throws Exception {
- final String candidateBroker = "pulsar://localhost:6650";
+ final String candidateBroker = "localhost:8080";
+ final String brokerUrl = "pulsar://localhost:6650";
final String listenerUrl = "pulsar://localhost:7000";
final String listenerUrlTls = "pulsar://localhost:8000";
final String listener = "listenerName";
Map<String, AdvertisedListener> advertisedListeners = new HashMap<>();
advertisedListeners.put(listener, AdvertisedListener.builder().brokerServiceUrl(new URI(listenerUrl)).brokerServiceUrlTls(new URI(listenerUrlTls)).build());
- LocalBrokerData ld = new LocalBrokerData(null, null, candidateBroker, null, advertisedListeners);
- URI uri = new URI(candidateBroker);
- String path = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri.getHost(), uri.getPort());
+ LocalBrokerData ld = new LocalBrokerData("http://" + candidateBroker, null, brokerUrl, null, advertisedListeners);
+ String path = String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, candidateBroker);
pulsar.getLocalMetadataStore().put(path,
ObjectMapperFactory.getMapper().writer().writeValueAsBytes(ld),
@@ -405,7 +405,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
LookupResult noListener = pulsar.getNamespaceService().createLookupResult(candidateBroker, false, null).get();
LookupResult withListener = pulsar.getNamespaceService().createLookupResult(candidateBroker, false, listener).get();
- Assert.assertEquals(noListener.getLookupData().getBrokerUrl(), candidateBroker);
+ Assert.assertEquals(noListener.getLookupData().getBrokerUrl(), brokerUrl);
Assert.assertEquals(withListener.getLookupData().getBrokerUrl(), listenerUrl);
Assert.assertEquals(withListener.getLookupData().getBrokerUrlTls(), listenerUrlTls);
System.out.println(withListener);
@@ -687,7 +687,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
@Test
public void testHeartbeatNamespaceMatch() throws Exception {
- NamespaceName namespaceName = NamespaceService.getHeartbeatNamespace(pulsar.getLookupServiceAddress(), conf);
+ NamespaceName namespaceName = NamespaceService.getHeartbeatNamespace(pulsar.getBrokerId(), conf);
NamespaceBundle namespaceBundle = pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundle(namespaceName);
assertTrue(NamespaceService.isSystemServiceNamespace(
NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
@@ -707,7 +707,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
loadManagerField.setAccessible(true);
doReturn(true).when(loadManager).isCentralized();
- SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getSafeWebServiceAddress(), null);
+ SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getBrokerId(), null);
Optional<ResourceUnit> res = Optional.of(resourceUnit);
doReturn(res).when(loadManager).getLeastLoaded(any(ServiceUnitId.class));
loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager));
@@ -860,10 +860,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
public CompletableFuture<Void> registryBrokerDataChangeNotice() {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
- String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
- + (conf.getWebServicePort().isPresent() ? conf.getWebServicePort().get()
- : conf.getWebServicePortTls().get());
- String brokerDataPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
+ String brokerDataPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getBrokerId();
pulsar.getLocalMetadataStore().registerListener(notice -> {
if (brokerDataPath.equals(notice.getPath())){
if (!completableFuture.isDone()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index f456a133d99..3600850974c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1534,9 +1534,9 @@ public class BrokerServiceTest extends BrokerTestBase {
assertTrue(brokerService.isSystemTopic(TRANSACTION_COORDINATOR_ASSIGN));
assertTrue(brokerService.isSystemTopic(TRANSACTION_COORDINATOR_LOG));
NamespaceName heartbeatNamespaceV1 = NamespaceService
- .getHeartbeatNamespace(pulsar.getLookupServiceAddress(), pulsar.getConfig());
+ .getHeartbeatNamespace(pulsar.getBrokerId(), pulsar.getConfig());
NamespaceName heartbeatNamespaceV2 = NamespaceService
- .getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(), pulsar.getConfig());
+ .getHeartbeatNamespaceV2(pulsar.getBrokerId(), pulsar.getConfig());
assertTrue(brokerService.isSystemTopic("persistent://" + heartbeatNamespaceV1.toString() + "/healthcheck"));
assertTrue(brokerService.isSystemTopic(heartbeatNamespaceV2.toString() + "/healthcheck"));
}
@@ -1726,11 +1726,11 @@ public class BrokerServiceTest extends BrokerTestBase {
}
@Test
- public void testGetLookupServiceAddress() throws Exception {
+ public void testGetBrokerId() throws Exception {
cleanup();
- setup();
conf.setWebServicePortTls(Optional.of(8081));
- assertEquals(pulsar.getLookupServiceAddress(), "localhost:8081");
+ setup();
+ assertEquals(pulsar.getBrokerId(), "localhost:8081");
resetState();
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index 84c4670f2bb..1549ba8d81c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -599,11 +599,11 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
super.baseSetup();
// init topic
NamespaceName heartbeatNamespaceV1 = NamespaceService
- .getHeartbeatNamespace(pulsar.getLookupServiceAddress(), pulsar.getConfig());
+ .getHeartbeatNamespace(pulsar.getBrokerId(), pulsar.getConfig());
final String healthCheckTopicV1 = "persistent://" + heartbeatNamespaceV1 + "/healthcheck";
NamespaceName heartbeatNamespaceV2 = NamespaceService
- .getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(), pulsar.getConfig());
+ .getHeartbeatNamespaceV2(pulsar.getBrokerId(), pulsar.getConfig());
final String healthCheckTopicV2 = "persistent://" + heartbeatNamespaceV2 + "/healthcheck";
admin.brokers().healthcheck(TopicVersion.V1);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 416d7ed0270..a2401ebe19a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -165,7 +165,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
@Test
public void testHealthCheckTopicNotOffload() throws Exception {
- NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
+ NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(),
pulsar.getConfig());
TopicName topicName = TopicName.get("persistent", namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
@@ -185,7 +185,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
@Test
public void testSystemNamespaceNotCreateChangeEventsTopic() throws Exception {
admin.brokers().healthcheck(TopicVersion.V2);
- NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
+ NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(),
pulsar.getConfig());
TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
Optional<Topic> optionalTopic = pulsar.getBrokerService()
@@ -203,7 +203,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
@Test
public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception {
admin.brokers().healthcheck(TopicVersion.V2);
- NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
+ NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(),
pulsar.getConfig());
TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
for (int partition = 0; partition < PARTITIONS; partition ++) {
@@ -218,7 +218,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
@Test
public void testHeartbeatTopicBeDeleted() throws Exception {
admin.brokers().healthcheck(TopicVersion.V2);
- NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
+ NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(),
pulsar.getConfig());
TopicName heartbeatTopicName = TopicName.get("persistent", namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
@@ -230,11 +230,11 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
topics = getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
Assert.assertEquals(topics.size(), 0);
}
-
+
@Test
public void testHeartbeatNamespaceNotCreateTransactionInternalTopic() throws Exception {
admin.brokers().healthcheck(TopicVersion.V2);
- NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
+ NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(),
pulsar.getConfig());
TopicName topicName = TopicName.get("persistent",
namespaceName, SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index 2611f0d8969..4b5af5e595c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
+import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
@@ -157,6 +158,8 @@ public class PulsarTestContext implements AutoCloseable {
private final boolean startable;
+ private final boolean preallocatePorts;
+
public ManagedLedgerFactory getManagedLedgerFactory() {
return managedLedgerClientFactory.getManagedLedgerFactory();
@@ -228,7 +231,9 @@ public class PulsarTestContext implements AutoCloseable {
protected SpyConfig.Builder spyConfigBuilder = SpyConfig.builder(SpyConfig.SpyType.NONE);
protected Consumer<PulsarService> pulsarServiceCustomizer;
protected ServiceConfiguration svcConfig = initializeConfig();
- protected Consumer<ServiceConfiguration> configOverrideCustomizer = this::defaultOverrideServiceConfiguration;
+ protected Consumer<ServiceConfiguration> configOverrideCustomizer;
+
+ protected boolean configOverrideCalled = false;
protected Function<BrokerService, BrokerService> brokerServiceCustomizer = Function.identity();
protected PulsarTestContext otherContextToClose;
@@ -354,6 +359,7 @@ public class PulsarTestContext implements AutoCloseable {
*/
public Builder configOverride(Consumer<ServiceConfiguration> configOverrideCustomizer) {
this.configOverrideCustomizer = configOverrideCustomizer;
+ this.configOverrideCalled = true;
return this;
}
@@ -538,6 +544,12 @@ public class PulsarTestContext implements AutoCloseable {
if (super.config == null) {
config(svcConfig);
}
+ handlePreallocatePorts(super.config);
+ if (configOverrideCustomizer != null || !configOverrideCalled) {
+ // call defaultOverrideServiceConfiguration if configOverrideCustomizer
+ // isn't explicitly set to null with `.configOverride(null)` call
+ defaultOverrideServiceConfiguration(super.config);
+ }
if (configOverrideCustomizer != null) {
configOverrideCustomizer.accept(super.config);
}
@@ -562,6 +574,37 @@ public class PulsarTestContext implements AutoCloseable {
return super.build();
}
+ protected void handlePreallocatePorts(ServiceConfiguration config) {
+ if (super.preallocatePorts) {
+ config.getBrokerServicePort().ifPresent(portNumber -> {
+ if (portNumber == 0) {
+ config.setBrokerServicePort(Optional.of(PortManager.nextLockedFreePort()));
+ }
+ });
+ config.getBrokerServicePortTls().ifPresent(portNumber -> {
+ if (portNumber == 0) {
+ config.setBrokerServicePortTls(Optional.of(PortManager.nextLockedFreePort()));
+ }
+ });
+ config.getWebServicePort().ifPresent(portNumber -> {
+ if (portNumber == 0) {
+ config.setWebServicePort(Optional.of(PortManager.nextLockedFreePort()));
+ }
+ });
+ config.getWebServicePortTls().ifPresent(portNumber -> {
+ if (portNumber == 0) {
+ config.setWebServicePortTls(Optional.of(PortManager.nextLockedFreePort()));
+ }
+ });
+ registerCloseable(() -> {
+ config.getBrokerServicePort().ifPresent(PortManager::releaseLockedPort);
+ config.getBrokerServicePortTls().ifPresent(PortManager::releaseLockedPort);
+ config.getWebServicePort().ifPresent(PortManager::releaseLockedPort);
+ config.getWebServicePortTls().ifPresent(PortManager::releaseLockedPort);
+ });
+ }
+ }
+
private void initializeCommonPulsarServices(SpyConfig spyConfig) {
if (super.bookKeeperClient == null && super.managedLedgerClientFactory == null) {
if (super.executor == null) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index a632608bf70..cb72c7d42cd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -172,7 +172,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
// mock: return Broker2 as a Least-loaded broker when leader receives request [3]
doReturn(true).when(loadManager1).isCentralized();
- SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getSafeWebServiceAddress(), null);
+ SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getBrokerId(), null);
doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));
@@ -305,7 +305,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
// mock: return Broker2 as a Least-loaded broker when leader receives request
doReturn(true).when(loadManager2).isCentralized();
- SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getSafeWebServiceAddress(), null);
+ SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getBrokerId(), null);
doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager2));
/**** started broker-2 ****/
@@ -485,7 +485,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
// request [3]
doReturn(true).when(loadManager1).isCentralized();
doReturn(true).when(loadManager2).isCentralized();
- SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddressTls(), null);
+ SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getBrokerId(), null);
doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
@@ -579,7 +579,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2));
// mock: return Broker1 as a Least-loaded broker when leader receives request [3]
doReturn(true).when(loadManager1).isCentralized();
- SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getSafeWebServiceAddress(), null);
+ SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getBrokerId(), null);
doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));
@@ -694,7 +694,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2));
// mock: return Broker1 as a Least-loaded broker when leader receives request [3]
doReturn(true).when(loadManager1).isCentralized();
- SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getSafeWebServiceAddress(), null);
+ SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getBrokerId(), null);
Optional<ResourceUnit> res = Optional.of(resourceUnit);
doReturn(res).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
doReturn(res).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 6bc848a90d0..698ab15940b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -1778,9 +1778,9 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
@SneakyThrows
@Test
public void testHealthCheckTopicNotCompacted() {
- NamespaceName heartbeatNamespaceV1 = NamespaceService.getHeartbeatNamespace(pulsar.getLookupServiceAddress(), pulsar.getConfiguration());
+ NamespaceName heartbeatNamespaceV1 = NamespaceService.getHeartbeatNamespace(pulsar.getBrokerId(), pulsar.getConfiguration());
String topicV1 = "persistent://" + heartbeatNamespaceV1.toString() + "/healthcheck";
- NamespaceName heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(), pulsar.getConfiguration());
+ NamespaceName heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(pulsar.getBrokerId(), pulsar.getConfiguration());
String topicV2 = heartbeatNamespaceV2.toString() + "/healthcheck";
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicV1).create();
Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicV2).create();
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BrokerInfo.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BrokerInfo.java
index 8955fe7a0ac..19e9ff2d15a 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BrokerInfo.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BrokerInfo.java
@@ -25,9 +25,11 @@ import org.apache.pulsar.common.policies.data.impl.BrokerInfoImpl;
*/
public interface BrokerInfo {
String getServiceUrl();
+ String getBrokerId();
interface Builder {
Builder serviceUrl(String serviceUrl);
+ Builder brokerId(String brokerId);
BrokerInfo build();
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BrokerInfoImpl.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BrokerInfoImpl.java
index e4d0a68b50a..d77f693c7cd 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BrokerInfoImpl.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BrokerInfoImpl.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.common.policies.data.BrokerInfo;
@NoArgsConstructor
public final class BrokerInfoImpl implements BrokerInfo {
private String serviceUrl;
+ private String brokerId;
public static BrokerInfoImplBuilder builder() {
return new BrokerInfoImplBuilder();
@@ -38,14 +39,20 @@ public final class BrokerInfoImpl implements BrokerInfo {
public static class BrokerInfoImplBuilder implements BrokerInfo.Builder {
private String serviceUrl;
+ private String brokerId;
public BrokerInfoImplBuilder serviceUrl(String serviceUrl) {
this.serviceUrl = serviceUrl;
return this;
}
+ public BrokerInfoImplBuilder brokerId(String brokerId) {
+ this.brokerId = brokerId;
+ return this;
+ }
+
public BrokerInfoImpl build() {
- return new BrokerInfoImpl(serviceUrl);
+ return new BrokerInfoImpl(serviceUrl, brokerId);
}
}
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
index 147c5396520..f997532b273 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
@@ -142,7 +142,7 @@ public class ProxyWithExtensibleLoadManagerTest extends MultiBrokerBaseTest {
var srcBrokerUrl = admin.lookups().lookupTopic(topicName.toString());
return getAllBrokers().stream().
filter(pulsarService -> !Objects.equals(srcBrokerUrl, pulsarService.getBrokerServiceUrl())).
- map(PulsarService::getLookupServiceAddress).
+ map(PulsarService::getBrokerId).
findAny().orElseThrow(() -> new Exception("Could not determine destination broker lookup URL"));
}