You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by kw...@apache.org on 2023/10/18 01:04:58 UTC
[pulsar] branch branch-3.1 updated: [fix][broker][branch-3.1] Fix lookup heartbeat and sla namespace bundle when using extensible load manager (#21213) (#21314)
This is an automated email from the ASF dual-hosted git repository.
kwang pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 045441005f6 [fix][broker][branch-3.1] Fix lookup heartbeat and sla namespace bundle when using extensible load manager (#21213) (#21314)
045441005f6 is described below
commit 045441005f60040520cd0ca8610bc7e932c77355
Author: Kai Wang <kw...@apache.org>
AuthorDate: Tue Oct 17 20:04:51 2023 -0500
[fix][broker][branch-3.1] Fix lookup heartbeat and sla namespace bundle when using extensible load manager (#21213) (#21314)
---
.../org/apache/pulsar/broker/PulsarService.java | 2 +-
.../apache/pulsar/broker/loadbalance/LoadData.java | 2 +-
.../extensions/ExtensibleLoadManagerImpl.java | 338 ++++++++++++---------
.../channel/ServiceUnitStateChannelImpl.java | 71 ++---
.../loadbalance/extensions/models/TopKBundles.java | 5 +-
.../pulsar/broker/namespace/NamespaceService.java | 38 +--
.../extensions/ExtensibleLoadManagerImplTest.java | 70 ++++-
.../channel/ServiceUnitStateChannelTest.java | 57 +---
8 files changed, 320 insertions(+), 263 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 4ffb5b77d54..6747bbb916d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1176,7 +1176,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
protected void acquireSLANamespace() {
try {
// Namespace not created hence no need to unload it
- NamespaceName nsName = NamespaceService.getSLAMonitorNamespace(getAdvertisedAddress(), config);
+ NamespaceName nsName = NamespaceService.getSLAMonitorNamespace(getLookupServiceAddress(), config);
if (!this.pulsarResources.getNamespaceResources().namespaceExists(nsName)) {
LOG.info("SLA Namespace = {} doesn't exist.", nsName);
return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
index a632a47f051..c1fe2a4930c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
@@ -64,7 +64,7 @@ public class LoadData {
public Map<String, BundleData> getBundleDataForLoadShedding() {
return bundleData.entrySet().stream()
- .filter(e -> !NamespaceService.filterNamespaceForShedding(
+ .filter(e -> !NamespaceService.isSLAOrHeartbeatNamespace(
NamespaceBundle.getBundleNamespace(e.getKey())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index cba499eb8ee..85baf9ec4fb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -44,6 +44,7 @@ import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -86,6 +87,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsa
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
+import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
@@ -95,7 +97,6 @@ import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.slf4j.Logger;
@@ -152,6 +153,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
@Getter
private final List<BrokerFilter> brokerFilterPipeline;
+
/**
* The load data reporter.
*/
@@ -181,10 +183,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
// record split metrics
private final AtomicReference<List<Metrics>> splitMetrics = new AtomicReference<>();
- private final ConcurrentOpenHashMap<String, CompletableFuture<Optional<BrokerLookupData>>>
- lookupRequests = ConcurrentOpenHashMap.<String,
- CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
- .build();
+ private final ConcurrentHashMap<String, CompletableFuture<Optional<BrokerLookupData>>>
+ lookupRequests = new ConcurrentHashMap<>();
private final CountDownLatch initWaiter = new CountDownLatch(1);
/**
@@ -197,7 +197,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
}
Set<Map.Entry<String, ServiceUnitStateData>> entrySet = serviceUnitStateChannel.getOwnershipEntrySet();
String brokerId = brokerRegistry.getBrokerId();
- return entrySet.stream()
+ Set<NamespaceBundle> ownedServiceUnits = entrySet.stream()
.filter(entry -> {
var stateData = entry.getValue();
return stateData.state() == ServiceUnitState.Owned
@@ -207,6 +207,36 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
var bundle = entry.getKey();
return getNamespaceBundle(pulsar, bundle);
}).collect(Collectors.toSet());
+ // Add heartbeat and SLA monitor namespace bundle.
+ NamespaceName heartbeatNamespace = NamespaceService.getHeartbeatNamespace(brokerId, pulsar.getConfiguration());
+ try {
+ NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory()
+ .getFullBundle(heartbeatNamespace);
+ ownedServiceUnits.add(fullBundle);
+ } catch (Exception e) {
+ log.warn("Failed to get heartbeat namespace bundle.", e);
+ }
+ NamespaceName heartbeatNamespaceV2 = NamespaceService
+ .getHeartbeatNamespaceV2(brokerId, pulsar.getConfiguration());
+ try {
+ NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory()
+ .getFullBundle(heartbeatNamespaceV2);
+ ownedServiceUnits.add(fullBundle);
+ } catch (Exception e) {
+ log.warn("Failed to get heartbeat namespace V2 bundle.", e);
+ }
+
+ NamespaceName slaMonitorNamespace = NamespaceService
+ .getSLAMonitorNamespace(brokerId, pulsar.getConfiguration());
+ try {
+ NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory()
+ .getFullBundle(slaMonitorNamespace);
+ ownedServiceUnits.add(fullBundle);
+ } catch (Exception e) {
+ log.warn("Failed to get SLA Monitor namespace bundle.", e);
+ }
+
+ return ownedServiceUnits;
}
public enum Role {
@@ -261,102 +291,108 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
if (this.started) {
return;
}
- this.brokerRegistry = new BrokerRegistryImpl(pulsar);
- this.leaderElectionService = new LeaderElectionService(
- pulsar.getCoordinationService(), pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
- state -> {
- pulsar.getLoadManagerExecutor().execute(() -> {
- if (state == LeaderElectionState.Leading) {
- playLeader();
- } else {
- playFollower();
- }
+ try {
+ this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+ this.leaderElectionService = new LeaderElectionService(
+ pulsar.getCoordinationService(), pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
+ state -> {
+ pulsar.getLoadManagerExecutor().execute(() -> {
+ if (state == LeaderElectionState.Leading) {
+ playLeader();
+ } else {
+ playFollower();
+ }
+ });
});
- });
- this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
- this.brokerRegistry.start();
- this.splitManager = new SplitManager(splitCounter);
- this.unloadManager = new UnloadManager(unloadCounter);
- this.serviceUnitStateChannel.listen(unloadManager);
- this.serviceUnitStateChannel.listen(splitManager);
- this.leaderElectionService.start();
- this.serviceUnitStateChannel.start();
- this.antiAffinityGroupPolicyHelper =
- new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel);
- antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
- this.antiAffinityGroupPolicyFilter = new AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper);
- this.brokerFilterPipeline.add(antiAffinityGroupPolicyFilter);
- SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar);
- this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
- this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper));
-
- createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC);
- createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
+ this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
+ this.brokerRegistry.start();
+ this.splitManager = new SplitManager(splitCounter);
+ this.unloadManager = new UnloadManager(unloadCounter);
+ this.serviceUnitStateChannel.listen(unloadManager);
+ this.serviceUnitStateChannel.listen(splitManager);
+ this.leaderElectionService.start();
+ this.serviceUnitStateChannel.start();
+ this.antiAffinityGroupPolicyHelper =
+ new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel);
+ antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
+ this.antiAffinityGroupPolicyFilter = new AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper);
+ this.brokerFilterPipeline.add(antiAffinityGroupPolicyFilter);
+ SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar);
+ this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
+ this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper));
+
+ createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC);
+ createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
- try {
- this.brokerLoadDataStore = LoadDataStoreFactory
- .create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
- this.brokerLoadDataStore.startTableView();
- this.topBundlesLoadDataStore = LoadDataStoreFactory
- .create(pulsar.getClient(), TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
- } catch (LoadDataStoreException e) {
- throw new PulsarServerException(e);
- }
+ try {
+ this.brokerLoadDataStore = LoadDataStoreFactory
+ .create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
+ this.brokerLoadDataStore.startTableView();
+ this.topBundlesLoadDataStore = LoadDataStoreFactory
+ .create(pulsar.getClient(), TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
+ } catch (LoadDataStoreException e) {
+ throw new PulsarServerException(e);
+ }
- this.context = LoadManagerContextImpl.builder()
- .configuration(conf)
- .brokerRegistry(brokerRegistry)
- .brokerLoadDataStore(brokerLoadDataStore)
- .topBundleLoadDataStore(topBundlesLoadDataStore).build();
-
- this.brokerLoadDataReporter =
- new BrokerLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), brokerLoadDataStore);
-
- this.topBundleLoadDataReporter =
- new TopBundleLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), topBundlesLoadDataStore);
- this.serviceUnitStateChannel.listen(brokerLoadDataReporter);
- this.serviceUnitStateChannel.listen(topBundleLoadDataReporter);
- var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis();
- this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor()
- .scheduleAtFixedRate(() -> {
- try {
- brokerLoadDataReporter.reportAsync(false);
- // TODO: update broker load metrics using getLocalData
- } catch (Throwable e) {
- log.error("Failed to run the broker load manager executor job.", e);
- }
- },
- interval,
- interval, TimeUnit.MILLISECONDS);
-
- this.topBundlesLoadDataReportTask = this.pulsar.getLoadManagerExecutor()
- .scheduleAtFixedRate(() -> {
- try {
- // TODO: consider excluding the bundles that are in the process of split.
- topBundleLoadDataReporter.reportAsync(false);
- } catch (Throwable e) {
- log.error("Failed to run the top bundles load manager executor job.", e);
- }
- },
- interval,
- interval, TimeUnit.MILLISECONDS);
-
- this.monitorTask = this.pulsar.getLoadManagerExecutor()
- .scheduleAtFixedRate(() -> {
- monitor();
- },
- MONITOR_INTERVAL_IN_MILLIS,
- MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS);
-
- this.unloadScheduler = new UnloadScheduler(
- pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context,
- serviceUnitStateChannel, unloadCounter, unloadMetrics);
- this.unloadScheduler.start();
- this.splitScheduler = new SplitScheduler(
- pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
- this.splitScheduler.start();
- this.initWaiter.countDown();
- this.started = true;
+ this.context = LoadManagerContextImpl.builder()
+ .configuration(conf)
+ .brokerRegistry(brokerRegistry)
+ .brokerLoadDataStore(brokerLoadDataStore)
+ .topBundleLoadDataStore(topBundlesLoadDataStore).build();
+
+ this.brokerLoadDataReporter =
+ new BrokerLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), brokerLoadDataStore);
+
+ this.topBundleLoadDataReporter =
+ new TopBundleLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), topBundlesLoadDataStore);
+ this.serviceUnitStateChannel.listen(brokerLoadDataReporter);
+ this.serviceUnitStateChannel.listen(topBundleLoadDataReporter);
+ var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis();
+ this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor()
+ .scheduleAtFixedRate(() -> {
+ try {
+ brokerLoadDataReporter.reportAsync(false);
+ // TODO: update broker load metrics using getLocalData
+ } catch (Throwable e) {
+ log.error("Failed to run the broker load manager executor job.", e);
+ }
+ },
+ interval,
+ interval, TimeUnit.MILLISECONDS);
+
+ this.topBundlesLoadDataReportTask = this.pulsar.getLoadManagerExecutor()
+ .scheduleAtFixedRate(() -> {
+ try {
+ // TODO: consider excluding the bundles that are in the process of split.
+ topBundleLoadDataReporter.reportAsync(false);
+ } catch (Throwable e) {
+ log.error("Failed to run the top bundles load manager executor job.", e);
+ }
+ },
+ interval,
+ interval, TimeUnit.MILLISECONDS);
+
+ this.monitorTask = this.pulsar.getLoadManagerExecutor()
+ .scheduleAtFixedRate(() -> {
+ monitor();
+ },
+ MONITOR_INTERVAL_IN_MILLIS,
+ MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS);
+
+ this.unloadScheduler = new UnloadScheduler(
+ pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context,
+ serviceUnitStateChannel, unloadCounter, unloadMetrics);
+ this.unloadScheduler.start();
+ this.splitScheduler = new SplitScheduler(
+ pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
+ this.splitScheduler.start();
+ this.initWaiter.countDown();
+ this.started = true;
+ } catch (Exception ex) {
+ if (this.brokerRegistry != null) {
+ brokerRegistry.close();
+ }
+ }
}
@Override
@@ -377,25 +413,38 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
owner = serviceUnitStateChannel.getChannelOwnerAsync();
} else {
- owner = getOwnerAsync(serviceUnit, bundle, false).thenApply(Optional::ofNullable);
+ String candidateBrokerId = getHeartbeatOrSLAMonitorBrokerId(serviceUnit);
+ if (candidateBrokerId != null) {
+ owner = CompletableFuture.completedFuture(Optional.of(candidateBrokerId));
+ } else {
+ owner = getOrSelectOwnerAsync(serviceUnit, bundle).thenApply(Optional::ofNullable);
+ }
}
return getBrokerLookupData(owner, bundle);
});
}
- private CompletableFuture<String> getOwnerAsync(
- ServiceUnitId serviceUnit, String bundle, boolean ownByLocalBrokerIfAbsent) {
+ private String getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnit) {
+ // Check if this is Heartbeat or SLAMonitor namespace
+ String candidateBroker = NamespaceService.checkHeartbeatNamespace(serviceUnit);
+ if (candidateBroker == null) {
+ candidateBroker = NamespaceService.checkHeartbeatNamespaceV2(serviceUnit);
+ }
+ if (candidateBroker == null) {
+ candidateBroker = NamespaceService.getSLAMonitorBrokerName(serviceUnit);
+ }
+ if (candidateBroker != null) {
+ return candidateBroker.substring(candidateBroker.lastIndexOf('/') + 1);
+ }
+ return candidateBroker;
+ }
+
+ private CompletableFuture<String> getOrSelectOwnerAsync(ServiceUnitId serviceUnit,
+ String bundle) {
return serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
// If the bundle not assign yet, select and publish assign event to channel.
if (broker.isEmpty()) {
- CompletableFuture<Optional<String>> selectedBroker;
- if (ownByLocalBrokerIfAbsent) {
- String brokerId = this.brokerRegistry.getBrokerId();
- selectedBroker = CompletableFuture.completedFuture(Optional.of(brokerId));
- } else {
- selectedBroker = this.selectAsync(serviceUnit);
- }
- return selectedBroker.thenCompose(brokerOpt -> {
+ return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
if (brokerOpt.isPresent()) {
assignCounter.incrementSuccess();
log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
@@ -425,7 +474,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
}).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
if (brokerLookupData.isEmpty()) {
String errorMsg = String.format(
- "Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
+ "Failed to lookup broker:%s for bundle:%s, the broker has not been registered.",
+ broker, bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
@@ -443,30 +493,37 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(NamespaceBundle namespaceBundle) {
log.info("Try acquiring ownership for bundle: {} - {}.", namespaceBundle, brokerRegistry.getBrokerId());
final String bundle = namespaceBundle.toString();
- return dedupeLookupRequest(bundle, k -> {
- final CompletableFuture<String> owner =
- this.getOwnerAsync(namespaceBundle, bundle, true);
- return getBrokerLookupData(owner.thenApply(Optional::ofNullable), bundle);
- }).thenApply(brokerLookupData -> {
- if (brokerLookupData.isEmpty()) {
- throw new IllegalStateException(
- "Failed to get the broker lookup data for bundle: " + bundle);
- }
- return brokerLookupData.get().toNamespaceEphemeralData();
- });
+ return assign(Optional.empty(), namespaceBundle)
+ .thenApply(brokerLookupData -> {
+ if (brokerLookupData.isEmpty()) {
+ String errorMsg = String.format(
+ "Failed to get the broker lookup data for bundle:%s", bundle);
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ }
+ return brokerLookupData.get().toNamespaceEphemeralData();
+ });
}
private CompletableFuture<Optional<BrokerLookupData>> dedupeLookupRequest(
String key, Function<String, CompletableFuture<Optional<BrokerLookupData>>> provider) {
- CompletableFuture<Optional<BrokerLookupData>> future = lookupRequests.computeIfAbsent(key, provider);
- future.whenComplete((r, t) -> {
- if (t != null) {
+ final MutableObject<CompletableFuture<Optional<BrokerLookupData>>> newFutureCreated = new MutableObject<>();
+ try {
+ return lookupRequests.computeIfAbsent(key, k -> {
+ CompletableFuture<Optional<BrokerLookupData>> future = provider.apply(k);
+ newFutureCreated.setValue(future);
+ return future;
+ });
+ } finally {
+ if (newFutureCreated.getValue() != null) {
+ newFutureCreated.getValue().whenComplete((v, ex) -> {
+ if (ex != null) {
assignCounter.incrementFailure();
}
- lookupRequests.remove(key);
- }
- );
- return future;
+ lookupRequests.remove(key, newFutureCreated.getValue());
+ });
+ }
+ }
}
public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle) {
@@ -521,15 +578,16 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
}
public CompletableFuture<Optional<String>> getOwnershipAsync(Optional<ServiceUnitId> topic,
- ServiceUnitId bundleUnit) {
- final String bundle = bundleUnit.toString();
- CompletableFuture<Optional<String>> owner;
+ ServiceUnitId serviceUnit) {
+ final String bundle = serviceUnit.toString();
if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
- owner = serviceUnitStateChannel.getChannelOwnerAsync();
- } else {
- owner = serviceUnitStateChannel.getOwnerAsync(bundle);
+ return serviceUnitStateChannel.getChannelOwnerAsync();
}
- return owner;
+ String candidateBroker = getHeartbeatOrSLAMonitorBrokerId(serviceUnit);
+ if (candidateBroker != null) {
+ return CompletableFuture.completedFuture(Optional.of(candidateBroker));
+ }
+ return serviceUnitStateChannel.getOwnerAsync(bundle);
}
public CompletableFuture<Optional<BrokerLookupData>> getOwnershipWithLookupDataAsync(ServiceUnitId bundleUnit) {
@@ -543,6 +601,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
Optional<String> destinationBroker) {
+ if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
+ log.info("Skip unloading namespace bundle: {}.", bundle);
+ return CompletableFuture.completedFuture(null);
+ }
return getOwnershipAsync(Optional.empty(), bundle)
.thenCompose(brokerOpt -> {
if (brokerOpt.isEmpty()) {
@@ -577,6 +639,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
public CompletableFuture<Void> splitNamespaceBundleAsync(ServiceUnitId bundle,
NamespaceBundleSplitAlgorithm splitAlgorithm,
List<Long> boundaries) {
+ if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
+ log.info("Skip split namespace bundle: {}.", bundle);
+ return CompletableFuture.completedFuture(null);
+ }
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle.toString());
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle.toString());
NamespaceBundle namespaceBundle =
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 98aa02d4e72..d71513652e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -41,8 +41,6 @@ import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Stable;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT;
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT_V2;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
@@ -94,7 +92,6 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
-import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
@@ -1216,48 +1213,19 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
log.info("Started ownership cleanup for the inactive broker:{}", broker);
int orphanServiceUnitCleanupCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
- String heartbeatNamespace =
- NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), broker)).toString();
- String heartbeatNamespaceV2 =
- NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, broker)).toString();
-
Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new HashMap<>();
for (var etr : tableview.entrySet()) {
var stateData = etr.getValue();
var serviceUnit = etr.getKey();
var state = state(stateData);
- if (StringUtils.equals(broker, stateData.dstBroker())) {
- if (isActiveState(state)) {
- if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
- orphanSystemServiceUnits.put(serviceUnit, stateData);
- } else if (serviceUnit.startsWith(heartbeatNamespace)
- || serviceUnit.startsWith(heartbeatNamespaceV2)) {
- // Skip the heartbeat namespace
- log.info("Skip override heartbeat namespace bundle"
- + " serviceUnit:{}, stateData:{}", serviceUnit, stateData);
- tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
- if (e != null) {
- log.error("Failed cleaning the heartbeat namespace ownership serviceUnit:{}, "
- + "stateData:{}, cleanupErrorCnt:{}.",
- serviceUnit, stateData,
- totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e);
- }
- });
- } else {
- overrideOwnership(serviceUnit, stateData, broker);
- }
- orphanServiceUnitCleanupCnt++;
- }
-
- } else if (StringUtils.equals(broker, stateData.sourceBroker())) {
- if (isInFlightState(state)) {
- if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
- orphanSystemServiceUnits.put(serviceUnit, stateData);
- } else {
- overrideOwnership(serviceUnit, stateData, broker);
- }
- orphanServiceUnitCleanupCnt++;
+ if (StringUtils.equals(broker, stateData.dstBroker()) && isActiveState(state)
+ || StringUtils.equals(broker, stateData.sourceBroker()) && isInFlightState(state)) {
+ if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
+ orphanSystemServiceUnits.put(serviceUnit, stateData);
+ } else {
+ overrideOwnership(serviceUnit, stateData, broker);
}
+ orphanServiceUnitCleanupCnt++;
}
}
@@ -1401,16 +1369,21 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
String srcBroker = stateData.sourceBroker();
var state = stateData.state();
- if (isActiveState(state)) {
- if (StringUtils.isNotBlank(srcBroker) && !activeBrokers.contains(srcBroker)) {
- inactiveBrokers.add(srcBroker);
- } else if (StringUtils.isNotBlank(dstBroker) && !activeBrokers.contains(dstBroker)) {
- inactiveBrokers.add(dstBroker);
- } else if (isInFlightState(state)
- && now - stateData.timestamp() > inFlightStateWaitingTimeInMillis) {
- orphanServiceUnits.put(serviceUnit, stateData);
- }
- } else if (now - stateData.timestamp() > semiTerminalStateWaitingTimeInMillis) {
+ if (isActiveState(state) && StringUtils.isNotBlank(srcBroker) && !activeBrokers.contains(srcBroker)) {
+ inactiveBrokers.add(srcBroker);
+ continue;
+ }
+ if (isActiveState(state) && StringUtils.isNotBlank(dstBroker) && !activeBrokers.contains(dstBroker)) {
+ inactiveBrokers.add(dstBroker);
+ continue;
+ }
+ if (isActiveState(state) && isInFlightState(state)
+ && now - stateData.timestamp() > inFlightStateWaitingTimeInMillis) {
+ orphanServiceUnits.put(serviceUnit, stateData);
+ continue;
+ }
+
+ if (now - stateData.timestamp() > semiTerminalStateWaitingTimeInMillis) {
log.info("Found semi-terminal states to tombstone"
+ " serviceUnit:{}, stateData:{}", serviceUnit, stateData);
tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
index 2f5c32197c1..624546fdff8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
@@ -30,6 +30,8 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
@@ -70,7 +72,8 @@ public class TopKBundles {
pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
for (var etr : bundleStats.entrySet()) {
String bundle = etr.getKey();
- if (bundle.startsWith(NamespaceName.SYSTEM_NAMESPACE.toString())) {
+ // TODO: do not filter system topic while shedding
+ if (NamespaceService.isSystemServiceNamespace(NamespaceBundle.getBundleNamespace(bundle))) {
continue;
}
if (!isLoadBalancerSheddingBundlesWithPoliciesEnabled && hasPolicies(bundle)) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index d66e3c3b65d..57c0cc7c046 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -135,7 +135,7 @@ public class NamespaceService implements AutoCloseable {
public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)");
public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s";
public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s";
- public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s:%s";
+ public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s";
private final ConcurrentOpenHashMap<ClusterDataImpl, PulsarClientImpl> namespaceClients;
@@ -189,7 +189,7 @@ public class NamespaceService implements AutoCloseable {
CompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic)
.thenCompose(bundle -> {
// Do redirection if the cluster is in rollback or deploying.
- return redirectManager.findRedirectLookupResultAsync().thenCompose(optResult -> {
+ return findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {
if (optResult.isPresent()) {
LOG.info("[{}] Redirect lookup request to {} for topic {}",
pulsar.getSafeWebServiceAddress(), optResult.get(), topic);
@@ -221,6 +221,13 @@ public class NamespaceService implements AutoCloseable {
return future;
}
+ private CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync(ServiceUnitId bundle) {
+ if (isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+ return redirectManager.findRedirectLookupResultAsync();
+ }
+
public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {
return bundleFactory.getBundlesAsync(topic.getNamespaceObject())
.thenApply(bundles -> bundles.findBundle(topic));
@@ -288,8 +295,7 @@ public class NamespaceService implements AutoCloseable {
private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(@Nullable ServiceUnitId topic,
NamespaceBundle bundle,
LookupOptions options) {
-
- return redirectManager.findRedirectLookupResultAsync().thenCompose(optResult -> {
+ return findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {
if (optResult.isPresent()) {
LOG.info("[{}] Redirect lookup request to {} for topic {}",
pulsar.getSafeWebServiceAddress(), optResult.get(), topic);
@@ -695,7 +701,7 @@ public class NamespaceService implements AutoCloseable {
return lookupFuture;
}
- private boolean isBrokerActive(String candidateBroker) {
+ public boolean isBrokerActive(String candidateBroker) {
String candidateBrokerHostAndPort = parseHostAndPort(candidateBroker);
Set<String> availableBrokers = getAvailableBrokers();
if (availableBrokers.contains(candidateBrokerHostAndPort)) {
@@ -1564,7 +1570,7 @@ public class NamespaceService implements AutoCloseable {
}
public void unloadSLANamespace() throws Exception {
- NamespaceName namespaceName = getSLAMonitorNamespace(host, config);
+ NamespaceName namespaceName = getSLAMonitorNamespace(pulsar.getLookupServiceAddress(), config);
LOG.info("Checking owner for SLA namespace {}", namespaceName);
@@ -1589,14 +1595,8 @@ public class NamespaceService implements AutoCloseable {
return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, lookupBroker));
}
- public static NamespaceName getSLAMonitorNamespace(String host, ServiceConfiguration config) {
- Integer port = null;
- if (config.getWebServicePort().isPresent()) {
- port = config.getWebServicePort().get();
- } else if (config.getWebServicePortTls().isPresent()) {
- port = config.getWebServicePortTls().get();
- }
- return NamespaceName.get(String.format(SLA_NAMESPACE_FMT, config.getClusterName(), host, port));
+ public static NamespaceName getSLAMonitorNamespace(String lookupBroker, ServiceConfiguration config) {
+ return NamespaceName.get(String.format(SLA_NAMESPACE_FMT, config.getClusterName(), lookupBroker));
}
public static String checkHeartbeatNamespace(ServiceUnitId ns) {
@@ -1640,7 +1640,7 @@ public class NamespaceService implements AutoCloseable {
* @param namespace the namespace name
* @return True if namespace is HEARTBEAT_NAMESPACE or SLA_NAMESPACE
*/
- public static boolean filterNamespaceForShedding(String namespace) {
+ public static boolean isSLAOrHeartbeatNamespace(String namespace) {
return SLA_NAMESPACE_PATTERN.matcher(namespace).matches()
|| HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
|| HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches();
@@ -1653,14 +1653,16 @@ public class NamespaceService implements AutoCloseable {
}
public boolean registerSLANamespace() throws PulsarServerException {
- boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(host, config), false);
+ String lookupServiceAddress = pulsar.getLookupServiceAddress();
+ boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(lookupServiceAddress, config), false);
if (isNameSpaceRegistered) {
if (LOG.isDebugEnabled()) {
LOG.debug("Added SLA Monitoring namespace name in local cache: ns={}",
- getSLAMonitorNamespace(host, config));
+ getSLAMonitorNamespace(lookupServiceAddress, config));
}
} else if (LOG.isDebugEnabled()) {
- LOG.debug("SLA Monitoring not owned by the broker: ns={}", getSLAMonitorNamespace(host, config));
+ LOG.debug("SLA Monitoring not owned by the broker: ns={}",
+ getSLAMonitorNamespace(lookupServiceAddress, config));
}
return isNameSpaceRegistered;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 9ce57a88540..011e7174cbe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -35,6 +35,9 @@ import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecis
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
+import static org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespace;
+import static org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespaceV2;
+import static org.apache.pulsar.broker.namespace.NamespaceService.getSLAMonitorNamespace;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -594,6 +597,18 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
assertTrue(webServiceUrl3.isPresent());
assertEquals(webServiceUrl3.get().toString(), webServiceUrl1.get().toString());
+ List<PulsarService> pulsarServices = List.of(pulsar1, pulsar2, pulsar3);
+ for (PulsarService pulsarService : pulsarServices) {
+ // Test lookup heartbeat namespace's topic
+ for (PulsarService pulsar : pulsarServices) {
+ assertLookupHeartbeatOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+ }
+ // Test lookup SLA namespace's topic
+ for (PulsarService pulsar : pulsarServices) {
+ assertLookupSLANamespaceOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+ }
+ }
+
// Test deploy new broker with new load manager
ServiceConfiguration conf = getDefaultConf();
conf.setAllowAutoTopicCreation(true);
@@ -642,10 +657,48 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
assertTrue(webServiceUrl4.isPresent());
assertEquals(webServiceUrl4.get().toString(), webServiceUrl1.get().toString());
+ pulsarServices = List.of(pulsar1, pulsar2, pulsar3, pulsar4);
+ for (PulsarService pulsarService : pulsarServices) {
+ // Test lookup heartbeat namespace's topic
+ for (PulsarService pulsar : pulsarServices) {
+ assertLookupHeartbeatOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+ }
+ // Test lookup SLA namespace's topic
+ for (PulsarService pulsar : pulsarServices) {
+ assertLookupSLANamespaceOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+ }
+ }
}
}
}
+ private void assertLookupHeartbeatOwner(PulsarService pulsar,
+ String lookupServiceAddress,
+ String expectedBrokerServiceUrl) throws Exception {
+ NamespaceName heartbeatNamespaceV1 =
+ getHeartbeatNamespace(lookupServiceAddress, pulsar.getConfiguration());
+
+ String heartbeatV1Topic = heartbeatNamespaceV1.getPersistentTopicName("test");
+ assertEquals(pulsar.getAdminClient().lookups().lookupTopic(heartbeatV1Topic), expectedBrokerServiceUrl);
+
+ NamespaceName heartbeatNamespaceV2 =
+ getHeartbeatNamespaceV2(lookupServiceAddress, pulsar.getConfiguration());
+
+ String heartbeatV2Topic = heartbeatNamespaceV2.getPersistentTopicName("test");
+ assertEquals(pulsar.getAdminClient().lookups().lookupTopic(heartbeatV2Topic), expectedBrokerServiceUrl);
+ }
+
+ private void assertLookupSLANamespaceOwner(PulsarService pulsar,
+ String lookupServiceAddress,
+ String expectedBrokerServiceUrl) throws Exception {
+ NamespaceName slaMonitorNamespace = getSLAMonitorNamespace(lookupServiceAddress, pulsar.getConfiguration());
+ String slaMonitorTopic = slaMonitorNamespace.getPersistentTopicName("test");
+ String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+ log.info("Topic {} Lookup result: {}", slaMonitorTopic, result);
+ assertNotNull(result);
+ assertEquals(result, expectedBrokerServiceUrl);
+ }
+
@Test
public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws Exception {
var topBundlesLoadDataStorePrimary =
@@ -1043,15 +1096,15 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
admin.namespaces().deleteNamespace(namespace, true);
}
- @Test(timeOut = 30 * 1000)
+ @Test(timeOut = 30 * 1000, priority = -1)
public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws Exception {
NamespaceName heartbeatNamespacePulsar1V1 =
- NamespaceService.getHeartbeatNamespace(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration());
+ getHeartbeatNamespace(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration());
NamespaceName heartbeatNamespacePulsar1V2 =
NamespaceService.getHeartbeatNamespaceV2(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration());
NamespaceName heartbeatNamespacePulsar2V1 =
- NamespaceService.getHeartbeatNamespace(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration());
+ getHeartbeatNamespace(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration());
NamespaceName heartbeatNamespacePulsar2V2 =
NamespaceService.getHeartbeatNamespaceV2(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration());
@@ -1068,22 +1121,22 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnits();
log.info("Owned service units: {}", ownedServiceUnitsByPulsar1);
// heartbeat namespace bundle will own by pulsar1
- assertEquals(ownedServiceUnitsByPulsar1.size(), 2);
+ assertEquals(ownedServiceUnitsByPulsar1.size(), 3);
assertTrue(ownedServiceUnitsByPulsar1.contains(bundle1));
assertTrue(ownedServiceUnitsByPulsar1.contains(bundle2));
Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnits();
log.info("Owned service units: {}", ownedServiceUnitsByPulsar2);
- assertEquals(ownedServiceUnitsByPulsar2.size(), 2);
+ assertEquals(ownedServiceUnitsByPulsar2.size(), 3);
assertTrue(ownedServiceUnitsByPulsar2.contains(bundle3));
assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4));
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar1 =
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar1.getLookupServiceAddress());
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar2 =
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar2.getLookupServiceAddress());
- assertEquals(ownedNamespacesByPulsar1.size(), 2);
+ assertEquals(ownedNamespacesByPulsar1.size(), 3);
assertTrue(ownedNamespacesByPulsar1.containsKey(bundle1.toString()));
assertTrue(ownedNamespacesByPulsar1.containsKey(bundle2.toString()));
- assertEquals(ownedNamespacesByPulsar2.size(), 2);
+ assertEquals(ownedNamespacesByPulsar2.size(), 3);
assertTrue(ownedNamespacesByPulsar2.containsKey(bundle3.toString()));
assertTrue(ownedNamespacesByPulsar2.containsKey(bundle4.toString()));
@@ -1134,7 +1187,8 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
String topic = "persistent://" + namespace + "/test";
NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get();
NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get();
- assertEquals(namespaceEphemeralData.getNativeUrl(), pulsar1.getBrokerServiceUrl());
+ assertTrue(Set.of(pulsar1.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl())
+ .contains(namespaceEphemeralData.getNativeUrl()));
admin.namespaces().deleteNamespace(namespace, true);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index de21842f458..a226df53e12 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -30,8 +30,6 @@ import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Unload;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MAX_CLEAN_UP_DELAY_TIME_IN_SECS;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT;
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT_V2;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.ConnectionLost;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.Reconnected;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
@@ -89,7 +87,6 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.TableViewImpl;
-import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -639,7 +636,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
validateMonitorCounters(leader,
0,
- 1,
+ 3,
0,
0,
0,
@@ -756,34 +753,6 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
waitUntilNewOwner(channel1, bundle2, broker);
waitUntilNewOwner(channel2, bundle2, broker);
- // Register the broker-1 heartbeat namespace bundle.
- String heartbeatNamespaceBroker1V1 = NamespaceName
- .get(String.format(HEARTBEAT_NAMESPACE_FMT, conf.getClusterName(), broker)).toString();
- String heartbeatNamespaceBroker1V2 = NamespaceName
- .get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, broker)).toString();
- String heartbeatNamespaceBroker1V1Bundle = heartbeatNamespaceBroker1V1 + "/0x00000000_0xfffffff0";
- String heartbeatNamespaceBroker1V2Bundle = heartbeatNamespaceBroker1V2 + "/0x00000000_0xfffffff0";
- channel1.publishAssignEventAsync(heartbeatNamespaceBroker1V1Bundle, broker);
- channel1.publishAssignEventAsync(heartbeatNamespaceBroker1V2Bundle, broker);
-
- // Register the broker-2 heartbeat namespace bundle.
- String heartbeatNamespaceBroker2V1 = NamespaceName
- .get(String.format(HEARTBEAT_NAMESPACE_FMT, conf.getClusterName(), lookupServiceAddress2)).toString();
- String heartbeatNamespaceBroker2V2 = NamespaceName
- .get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, lookupServiceAddress2)).toString();
- String heartbeatNamespaceBroker2V1Bundle = heartbeatNamespaceBroker2V1 + "/0x00000000_0xfffffff0";
- String heartbeatNamespaceBroker2V2Bundle = heartbeatNamespaceBroker2V2 + "/0x00000000_0xfffffff0";
- channel1.publishAssignEventAsync(heartbeatNamespaceBroker2V1Bundle, lookupServiceAddress2);
- channel1.publishAssignEventAsync(heartbeatNamespaceBroker2V2Bundle, lookupServiceAddress2);
- waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V1Bundle, broker);
- waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V2Bundle, broker);
- waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V1Bundle, broker);
- waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V2Bundle, broker);
- waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V1Bundle, lookupServiceAddress2);
- waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V2Bundle, lookupServiceAddress2);
- waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V1Bundle, lookupServiceAddress2);
- waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V2Bundle, lookupServiceAddress2);
-
// Verify to transfer the ownership to the other broker.
channel1.publishUnloadEventAsync(new Unload(broker, bundle1, Optional.of(lookupServiceAddress2)));
waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2);
@@ -806,16 +775,6 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
waitUntilNewOwner(channel1, bundle2, lookupServiceAddress2);
waitUntilNewOwner(channel2, bundle2, lookupServiceAddress2);
- waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V1Bundle, null);
- waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V2Bundle, null);
- waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V1Bundle, null);
- waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V2Bundle, null);
-
- waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V1Bundle, null);
- waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V2Bundle, null);
- waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V1Bundle, null);
- waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V2Bundle, null);
-
verify(leaderCleanupJobs, times(1)).computeIfAbsent(eq(broker), any());
verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any());
@@ -827,7 +786,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
validateMonitorCounters(leaderChannel,
2,
0,
- 7,
+ 3,
0,
2,
0,
@@ -858,7 +817,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
validateMonitorCounters(leaderChannel,
2,
0,
- 7,
+ 3,
0,
3,
0,
@@ -879,7 +838,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
validateMonitorCounters(leaderChannel,
2,
0,
- 7,
+ 3,
0,
3,
0,
@@ -901,7 +860,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
validateMonitorCounters(leaderChannel,
2,
0,
- 7,
+ 3,
0,
4,
0,
@@ -923,7 +882,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
validateMonitorCounters(leaderChannel,
3,
0,
- 9,
+ 5,
0,
4,
0,
@@ -952,7 +911,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
validateMonitorCounters(leaderChannel,
3,
0,
- 9,
+ 5,
0,
4,
1,
@@ -1447,7 +1406,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
validateMonitorCounters(leader,
0,
- 1,
+ 3,
1,
0,
0,