You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by kw...@apache.org on 2023/10/18 01:04:58 UTC

[pulsar] branch branch-3.1 updated: [fix][broker][branch-3.1] Fix lookup heartbeat and sla namespace bundle when using extensible load manager (#21213) (#21314)

This is an automated email from the ASF dual-hosted git repository.

kwang pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 045441005f6 [fix][broker][branch-3.1] Fix lookup heartbeat and sla namespace bundle when using extensible load manager (#21213) (#21314)
045441005f6 is described below

commit 045441005f60040520cd0ca8610bc7e932c77355
Author: Kai Wang <kw...@apache.org>
AuthorDate: Tue Oct 17 20:04:51 2023 -0500

    [fix][broker][branch-3.1] Fix lookup heartbeat and sla namespace bundle when using extensible load manager (#21213) (#21314)
---
 .../org/apache/pulsar/broker/PulsarService.java    |   2 +-
 .../apache/pulsar/broker/loadbalance/LoadData.java |   2 +-
 .../extensions/ExtensibleLoadManagerImpl.java      | 338 ++++++++++++---------
 .../channel/ServiceUnitStateChannelImpl.java       |  71 ++---
 .../loadbalance/extensions/models/TopKBundles.java |   5 +-
 .../pulsar/broker/namespace/NamespaceService.java  |  38 +--
 .../extensions/ExtensibleLoadManagerImplTest.java  |  70 ++++-
 .../channel/ServiceUnitStateChannelTest.java       |  57 +---
 8 files changed, 320 insertions(+), 263 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 4ffb5b77d54..6747bbb916d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1176,7 +1176,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     protected void acquireSLANamespace() {
         try {
             // Namespace not created hence no need to unload it
-            NamespaceName nsName = NamespaceService.getSLAMonitorNamespace(getAdvertisedAddress(), config);
+            NamespaceName nsName = NamespaceService.getSLAMonitorNamespace(getLookupServiceAddress(), config);
             if (!this.pulsarResources.getNamespaceResources().namespaceExists(nsName)) {
                 LOG.info("SLA Namespace = {} doesn't exist.", nsName);
                 return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
index a632a47f051..c1fe2a4930c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
@@ -64,7 +64,7 @@ public class LoadData {
 
     public Map<String, BundleData> getBundleDataForLoadShedding() {
         return bundleData.entrySet().stream()
-                .filter(e -> !NamespaceService.filterNamespaceForShedding(
+                .filter(e -> !NamespaceService.isSLAOrHeartbeatNamespace(
                         NamespaceBundle.getBundleNamespace(e.getKey())))
                 .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index cba499eb8ee..85baf9ec4fb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -44,6 +44,7 @@ import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -86,6 +87,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsa
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
@@ -95,7 +97,6 @@ import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
 import org.slf4j.Logger;
 
@@ -152,6 +153,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
 
     @Getter
     private final List<BrokerFilter> brokerFilterPipeline;
+
     /**
      * The load data reporter.
      */
@@ -181,10 +183,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
     // record split metrics
     private final AtomicReference<List<Metrics>> splitMetrics = new AtomicReference<>();
 
-    private final ConcurrentOpenHashMap<String, CompletableFuture<Optional<BrokerLookupData>>>
-            lookupRequests = ConcurrentOpenHashMap.<String,
-                    CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
-            .build();
+    private final ConcurrentHashMap<String, CompletableFuture<Optional<BrokerLookupData>>>
+            lookupRequests = new ConcurrentHashMap<>();
     private final CountDownLatch initWaiter = new CountDownLatch(1);
 
     /**
@@ -197,7 +197,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
         }
         Set<Map.Entry<String, ServiceUnitStateData>> entrySet = serviceUnitStateChannel.getOwnershipEntrySet();
         String brokerId = brokerRegistry.getBrokerId();
-        return entrySet.stream()
+        Set<NamespaceBundle> ownedServiceUnits = entrySet.stream()
                 .filter(entry -> {
                     var stateData = entry.getValue();
                     return stateData.state() == ServiceUnitState.Owned
@@ -207,6 +207,36 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
                     var bundle = entry.getKey();
                     return getNamespaceBundle(pulsar, bundle);
                 }).collect(Collectors.toSet());
+        // Add heartbeat and SLA monitor namespace bundle.
+        NamespaceName heartbeatNamespace = NamespaceService.getHeartbeatNamespace(brokerId, pulsar.getConfiguration());
+        try {
+            NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory()
+                    .getFullBundle(heartbeatNamespace);
+            ownedServiceUnits.add(fullBundle);
+        } catch (Exception e) {
+            log.warn("Failed to get heartbeat namespace bundle.", e);
+        }
+        NamespaceName heartbeatNamespaceV2 = NamespaceService
+                .getHeartbeatNamespaceV2(brokerId, pulsar.getConfiguration());
+        try {
+            NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory()
+                    .getFullBundle(heartbeatNamespaceV2);
+            ownedServiceUnits.add(fullBundle);
+        } catch (Exception e) {
+            log.warn("Failed to get heartbeat namespace V2 bundle.", e);
+        }
+
+        NamespaceName slaMonitorNamespace = NamespaceService
+                .getSLAMonitorNamespace(brokerId, pulsar.getConfiguration());
+        try {
+            NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory()
+                    .getFullBundle(slaMonitorNamespace);
+            ownedServiceUnits.add(fullBundle);
+        } catch (Exception e) {
+            log.warn("Failed to get SLA Monitor namespace bundle.", e);
+        }
+
+        return ownedServiceUnits;
     }
 
     public enum Role {
@@ -261,102 +291,108 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
         if (this.started) {
             return;
         }
-        this.brokerRegistry = new BrokerRegistryImpl(pulsar);
-        this.leaderElectionService = new LeaderElectionService(
-                pulsar.getCoordinationService(), pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
-                state -> {
-                    pulsar.getLoadManagerExecutor().execute(() -> {
-                        if (state == LeaderElectionState.Leading) {
-                            playLeader();
-                        } else {
-                            playFollower();
-                        }
+        try {
+            this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+            this.leaderElectionService = new LeaderElectionService(
+                    pulsar.getCoordinationService(), pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
+                    state -> {
+                        pulsar.getLoadManagerExecutor().execute(() -> {
+                            if (state == LeaderElectionState.Leading) {
+                                playLeader();
+                            } else {
+                                playFollower();
+                            }
+                        });
                     });
-                });
-        this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
-        this.brokerRegistry.start();
-        this.splitManager = new SplitManager(splitCounter);
-        this.unloadManager = new UnloadManager(unloadCounter);
-        this.serviceUnitStateChannel.listen(unloadManager);
-        this.serviceUnitStateChannel.listen(splitManager);
-        this.leaderElectionService.start();
-        this.serviceUnitStateChannel.start();
-        this.antiAffinityGroupPolicyHelper =
-                new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel);
-        antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
-        this.antiAffinityGroupPolicyFilter = new AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper);
-        this.brokerFilterPipeline.add(antiAffinityGroupPolicyFilter);
-        SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar);
-        this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
-        this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper));
-
-        createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC);
-        createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
+            this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
+            this.brokerRegistry.start();
+            this.splitManager = new SplitManager(splitCounter);
+            this.unloadManager = new UnloadManager(unloadCounter);
+            this.serviceUnitStateChannel.listen(unloadManager);
+            this.serviceUnitStateChannel.listen(splitManager);
+            this.leaderElectionService.start();
+            this.serviceUnitStateChannel.start();
+            this.antiAffinityGroupPolicyHelper =
+                    new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel);
+            antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
+            this.antiAffinityGroupPolicyFilter = new AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper);
+            this.brokerFilterPipeline.add(antiAffinityGroupPolicyFilter);
+            SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar);
+            this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
+            this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper));
+
+            createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC);
+            createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
 
-        try {
-            this.brokerLoadDataStore = LoadDataStoreFactory
-                    .create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
-            this.brokerLoadDataStore.startTableView();
-            this.topBundlesLoadDataStore = LoadDataStoreFactory
-                    .create(pulsar.getClient(), TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
-        } catch (LoadDataStoreException e) {
-            throw new PulsarServerException(e);
-        }
+            try {
+                this.brokerLoadDataStore = LoadDataStoreFactory
+                        .create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
+                this.brokerLoadDataStore.startTableView();
+                this.topBundlesLoadDataStore = LoadDataStoreFactory
+                        .create(pulsar.getClient(), TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
+            } catch (LoadDataStoreException e) {
+                throw new PulsarServerException(e);
+            }
 
-        this.context = LoadManagerContextImpl.builder()
-                .configuration(conf)
-                .brokerRegistry(brokerRegistry)
-                .brokerLoadDataStore(brokerLoadDataStore)
-                .topBundleLoadDataStore(topBundlesLoadDataStore).build();
-
-        this.brokerLoadDataReporter =
-                new BrokerLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), brokerLoadDataStore);
-
-        this.topBundleLoadDataReporter =
-                new TopBundleLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), topBundlesLoadDataStore);
-        this.serviceUnitStateChannel.listen(brokerLoadDataReporter);
-        this.serviceUnitStateChannel.listen(topBundleLoadDataReporter);
-        var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis();
-        this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor()
-                .scheduleAtFixedRate(() -> {
-                            try {
-                                brokerLoadDataReporter.reportAsync(false);
-                                // TODO: update broker load metrics using getLocalData
-                            } catch (Throwable e) {
-                                log.error("Failed to run the broker load manager executor job.", e);
-                            }
-                        },
-                        interval,
-                        interval, TimeUnit.MILLISECONDS);
-
-        this.topBundlesLoadDataReportTask = this.pulsar.getLoadManagerExecutor()
-                .scheduleAtFixedRate(() -> {
-                            try {
-                                // TODO: consider excluding the bundles that are in the process of split.
-                                topBundleLoadDataReporter.reportAsync(false);
-                            } catch (Throwable e) {
-                                log.error("Failed to run the top bundles load manager executor job.", e);
-                            }
-                        },
-                        interval,
-                        interval, TimeUnit.MILLISECONDS);
-
-        this.monitorTask = this.pulsar.getLoadManagerExecutor()
-                .scheduleAtFixedRate(() -> {
-                            monitor();
-                        },
-                        MONITOR_INTERVAL_IN_MILLIS,
-                        MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS);
-
-        this.unloadScheduler = new UnloadScheduler(
-                pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context,
-                serviceUnitStateChannel, unloadCounter, unloadMetrics);
-        this.unloadScheduler.start();
-        this.splitScheduler = new SplitScheduler(
-                pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
-        this.splitScheduler.start();
-        this.initWaiter.countDown();
-        this.started = true;
+            this.context = LoadManagerContextImpl.builder()
+                    .configuration(conf)
+                    .brokerRegistry(brokerRegistry)
+                    .brokerLoadDataStore(brokerLoadDataStore)
+                    .topBundleLoadDataStore(topBundlesLoadDataStore).build();
+
+            this.brokerLoadDataReporter =
+                    new BrokerLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), brokerLoadDataStore);
+
+            this.topBundleLoadDataReporter =
+                    new TopBundleLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), topBundlesLoadDataStore);
+            this.serviceUnitStateChannel.listen(brokerLoadDataReporter);
+            this.serviceUnitStateChannel.listen(topBundleLoadDataReporter);
+            var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis();
+            this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor()
+                    .scheduleAtFixedRate(() -> {
+                                try {
+                                    brokerLoadDataReporter.reportAsync(false);
+                                    // TODO: update broker load metrics using getLocalData
+                                } catch (Throwable e) {
+                                    log.error("Failed to run the broker load manager executor job.", e);
+                                }
+                            },
+                            interval,
+                            interval, TimeUnit.MILLISECONDS);
+
+            this.topBundlesLoadDataReportTask = this.pulsar.getLoadManagerExecutor()
+                    .scheduleAtFixedRate(() -> {
+                                try {
+                                    // TODO: consider excluding the bundles that are in the process of split.
+                                    topBundleLoadDataReporter.reportAsync(false);
+                                } catch (Throwable e) {
+                                    log.error("Failed to run the top bundles load manager executor job.", e);
+                                }
+                            },
+                            interval,
+                            interval, TimeUnit.MILLISECONDS);
+
+            this.monitorTask = this.pulsar.getLoadManagerExecutor()
+                    .scheduleAtFixedRate(() -> {
+                                monitor();
+                            },
+                            MONITOR_INTERVAL_IN_MILLIS,
+                            MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS);
+
+            this.unloadScheduler = new UnloadScheduler(
+                    pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context,
+                    serviceUnitStateChannel, unloadCounter, unloadMetrics);
+            this.unloadScheduler.start();
+            this.splitScheduler = new SplitScheduler(
+                    pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
+            this.splitScheduler.start();
+            this.initWaiter.countDown();
+            this.started = true;
+        } catch (Exception ex) {
+            if (this.brokerRegistry != null) {
+                brokerRegistry.close();
+            }
+        }
     }
 
     @Override
@@ -377,25 +413,38 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
             if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
                 owner = serviceUnitStateChannel.getChannelOwnerAsync();
             } else {
-                owner = getOwnerAsync(serviceUnit, bundle, false).thenApply(Optional::ofNullable);
+                String candidateBrokerId = getHeartbeatOrSLAMonitorBrokerId(serviceUnit);
+                if (candidateBrokerId != null) {
+                    owner = CompletableFuture.completedFuture(Optional.of(candidateBrokerId));
+                } else {
+                    owner = getOrSelectOwnerAsync(serviceUnit, bundle).thenApply(Optional::ofNullable);
+                }
             }
             return getBrokerLookupData(owner, bundle);
         });
     }
 
-    private CompletableFuture<String> getOwnerAsync(
-            ServiceUnitId serviceUnit, String bundle, boolean ownByLocalBrokerIfAbsent) {
+    private String getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnit) {
+        // Check if this is Heartbeat or SLAMonitor namespace
+        String candidateBroker = NamespaceService.checkHeartbeatNamespace(serviceUnit);
+        if (candidateBroker == null) {
+            candidateBroker = NamespaceService.checkHeartbeatNamespaceV2(serviceUnit);
+        }
+        if (candidateBroker == null) {
+            candidateBroker = NamespaceService.getSLAMonitorBrokerName(serviceUnit);
+        }
+        if (candidateBroker != null) {
+            return candidateBroker.substring(candidateBroker.lastIndexOf('/') + 1);
+        }
+        return candidateBroker;
+    }
+
+    private CompletableFuture<String> getOrSelectOwnerAsync(ServiceUnitId serviceUnit,
+                                                            String bundle) {
         return serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
             // If the bundle not assign yet, select and publish assign event to channel.
             if (broker.isEmpty()) {
-                CompletableFuture<Optional<String>> selectedBroker;
-                if (ownByLocalBrokerIfAbsent) {
-                    String brokerId = this.brokerRegistry.getBrokerId();
-                    selectedBroker = CompletableFuture.completedFuture(Optional.of(brokerId));
-                } else {
-                    selectedBroker = this.selectAsync(serviceUnit);
-                }
-                return selectedBroker.thenCompose(brokerOpt -> {
+                return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
                     if (brokerOpt.isPresent()) {
                         assignCounter.incrementSuccess();
                         log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
@@ -425,7 +474,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
         }).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
             if (brokerLookupData.isEmpty()) {
                 String errorMsg = String.format(
-                        "Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
+                        "Failed to lookup broker:%s for bundle:%s, the broker has not been registered.",
+                        broker, bundle);
                 log.error(errorMsg);
                 throw new IllegalStateException(errorMsg);
             }
@@ -443,30 +493,37 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
     public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(NamespaceBundle namespaceBundle) {
         log.info("Try acquiring ownership for bundle: {} - {}.", namespaceBundle, brokerRegistry.getBrokerId());
         final String bundle = namespaceBundle.toString();
-        return dedupeLookupRequest(bundle, k -> {
-            final CompletableFuture<String> owner =
-                    this.getOwnerAsync(namespaceBundle, bundle, true);
-            return getBrokerLookupData(owner.thenApply(Optional::ofNullable), bundle);
-        }).thenApply(brokerLookupData -> {
-            if (brokerLookupData.isEmpty()) {
-                throw new IllegalStateException(
-                        "Failed to get the broker lookup data for bundle: " + bundle);
-            }
-            return brokerLookupData.get().toNamespaceEphemeralData();
-        });
+        return assign(Optional.empty(), namespaceBundle)
+                .thenApply(brokerLookupData -> {
+                    if (brokerLookupData.isEmpty()) {
+                        String errorMsg = String.format(
+                                "Failed to get the broker lookup data for bundle:%s", bundle);
+                        log.error(errorMsg);
+                        throw new IllegalStateException(errorMsg);
+                    }
+                    return brokerLookupData.get().toNamespaceEphemeralData();
+                });
     }
 
     private CompletableFuture<Optional<BrokerLookupData>> dedupeLookupRequest(
             String key, Function<String, CompletableFuture<Optional<BrokerLookupData>>> provider) {
-        CompletableFuture<Optional<BrokerLookupData>> future = lookupRequests.computeIfAbsent(key, provider);
-        future.whenComplete((r, t) -> {
-                    if (t != null) {
+        final MutableObject<CompletableFuture<Optional<BrokerLookupData>>> newFutureCreated = new MutableObject<>();
+        try {
+            return lookupRequests.computeIfAbsent(key, k -> {
+                CompletableFuture<Optional<BrokerLookupData>> future = provider.apply(k);
+                newFutureCreated.setValue(future);
+                return future;
+            });
+        } finally {
+            if (newFutureCreated.getValue() != null) {
+                newFutureCreated.getValue().whenComplete((v, ex) -> {
+                    if (ex != null) {
                         assignCounter.incrementFailure();
                     }
-                    lookupRequests.remove(key);
-                }
-        );
-        return future;
+                    lookupRequests.remove(key, newFutureCreated.getValue());
+                });
+            }
+        }
     }
 
     public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle) {
@@ -521,15 +578,16 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
     }
 
     public CompletableFuture<Optional<String>> getOwnershipAsync(Optional<ServiceUnitId> topic,
-                                                                 ServiceUnitId bundleUnit) {
-        final String bundle = bundleUnit.toString();
-        CompletableFuture<Optional<String>> owner;
+                                                                 ServiceUnitId serviceUnit) {
+        final String bundle = serviceUnit.toString();
         if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
-            owner = serviceUnitStateChannel.getChannelOwnerAsync();
-        } else {
-            owner = serviceUnitStateChannel.getOwnerAsync(bundle);
+            return serviceUnitStateChannel.getChannelOwnerAsync();
         }
-        return owner;
+        String candidateBroker = getHeartbeatOrSLAMonitorBrokerId(serviceUnit);
+        if (candidateBroker != null) {
+            return CompletableFuture.completedFuture(Optional.of(candidateBroker));
+        }
+        return serviceUnitStateChannel.getOwnerAsync(bundle);
     }
 
     public CompletableFuture<Optional<BrokerLookupData>> getOwnershipWithLookupDataAsync(ServiceUnitId bundleUnit) {
@@ -543,6 +601,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
 
     public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
                                                               Optional<String> destinationBroker) {
+        if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
+            log.info("Skip unloading namespace bundle: {}.", bundle);
+            return CompletableFuture.completedFuture(null);
+        }
         return getOwnershipAsync(Optional.empty(), bundle)
                 .thenCompose(brokerOpt -> {
                     if (brokerOpt.isEmpty()) {
@@ -577,6 +639,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
     public CompletableFuture<Void> splitNamespaceBundleAsync(ServiceUnitId bundle,
                                                              NamespaceBundleSplitAlgorithm splitAlgorithm,
                                                              List<Long> boundaries) {
+        if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
+            log.info("Skip split namespace bundle: {}.", bundle);
+            return CompletableFuture.completedFuture(null);
+        }
         final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle.toString());
         final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle.toString());
         NamespaceBundle namespaceBundle =
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 98aa02d4e72..d71513652e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -41,8 +41,6 @@ import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
 import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Stable;
 import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
 import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT;
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT_V2;
 import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
 import static org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG;
 import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
@@ -94,7 +92,6 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
 import org.apache.pulsar.common.naming.NamespaceBundles;
-import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.stats.Metrics;
@@ -1216,48 +1213,19 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
         log.info("Started ownership cleanup for the inactive broker:{}", broker);
         int orphanServiceUnitCleanupCnt = 0;
         long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
-        String heartbeatNamespace =
-                NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), broker)).toString();
-        String heartbeatNamespaceV2 =
-                NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, broker)).toString();
-
         Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new HashMap<>();
         for (var etr : tableview.entrySet()) {
             var stateData = etr.getValue();
             var serviceUnit = etr.getKey();
             var state = state(stateData);
-            if (StringUtils.equals(broker, stateData.dstBroker())) {
-                if (isActiveState(state)) {
-                    if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
-                        orphanSystemServiceUnits.put(serviceUnit, stateData);
-                    } else if (serviceUnit.startsWith(heartbeatNamespace)
-                            || serviceUnit.startsWith(heartbeatNamespaceV2)) {
-                        // Skip the heartbeat namespace
-                        log.info("Skip override heartbeat namespace bundle"
-                                + " serviceUnit:{}, stateData:{}", serviceUnit, stateData);
-                        tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
-                            if (e != null) {
-                                log.error("Failed cleaning the heartbeat namespace ownership serviceUnit:{}, "
-                                                + "stateData:{}, cleanupErrorCnt:{}.",
-                                        serviceUnit, stateData,
-                                        totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e);
-                            }
-                        });
-                    } else {
-                        overrideOwnership(serviceUnit, stateData, broker);
-                    }
-                    orphanServiceUnitCleanupCnt++;
-                }
-
-            } else if (StringUtils.equals(broker, stateData.sourceBroker())) {
-                if (isInFlightState(state)) {
-                    if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
-                        orphanSystemServiceUnits.put(serviceUnit, stateData);
-                    } else {
-                        overrideOwnership(serviceUnit, stateData, broker);
-                    }
-                    orphanServiceUnitCleanupCnt++;
+            if (StringUtils.equals(broker, stateData.dstBroker()) && isActiveState(state)
+                    || StringUtils.equals(broker, stateData.sourceBroker()) && isInFlightState(state)) {
+                if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
+                    orphanSystemServiceUnits.put(serviceUnit, stateData);
+                } else {
+                    overrideOwnership(serviceUnit, stateData, broker);
                 }
+                orphanServiceUnitCleanupCnt++;
             }
         }
 
@@ -1401,16 +1369,21 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
             String srcBroker = stateData.sourceBroker();
             var state = stateData.state();
 
-            if (isActiveState(state)) {
-                if (StringUtils.isNotBlank(srcBroker) && !activeBrokers.contains(srcBroker)) {
-                    inactiveBrokers.add(srcBroker);
-                } else if (StringUtils.isNotBlank(dstBroker) && !activeBrokers.contains(dstBroker)) {
-                    inactiveBrokers.add(dstBroker);
-                } else if (isInFlightState(state)
-                        && now - stateData.timestamp() > inFlightStateWaitingTimeInMillis) {
-                    orphanServiceUnits.put(serviceUnit, stateData);
-                }
-            } else if (now - stateData.timestamp() > semiTerminalStateWaitingTimeInMillis) {
+            if (isActiveState(state) && StringUtils.isNotBlank(srcBroker) && !activeBrokers.contains(srcBroker)) {
+                inactiveBrokers.add(srcBroker);
+                continue;
+            }
+            if (isActiveState(state) && StringUtils.isNotBlank(dstBroker) && !activeBrokers.contains(dstBroker)) {
+                inactiveBrokers.add(dstBroker);
+                continue;
+            }
+            if (isActiveState(state) && isInFlightState(state)
+                    && now - stateData.timestamp() > inFlightStateWaitingTimeInMillis) {
+                orphanServiceUnits.put(serviceUnit, stateData);
+                continue;
+            }
+
+            if (now - stateData.timestamp() > semiTerminalStateWaitingTimeInMillis) {
                 log.info("Found semi-terminal states to tombstone"
                         + " serviceUnit:{}, stateData:{}", serviceUnit, stateData);
                 tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
index 2f5c32197c1..624546fdff8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
@@ -30,6 +30,8 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
@@ -70,7 +72,8 @@ public class TopKBundles {
                     pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
             for (var etr : bundleStats.entrySet()) {
                 String bundle = etr.getKey();
-                if (bundle.startsWith(NamespaceName.SYSTEM_NAMESPACE.toString())) {
+                // TODO: do not filter system topic while shedding
+                if (NamespaceService.isSystemServiceNamespace(NamespaceBundle.getBundleNamespace(bundle))) {
                     continue;
                 }
                 if (!isLoadBalancerSheddingBundlesWithPoliciesEnabled && hasPolicies(bundle)) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index d66e3c3b65d..57c0cc7c046 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -135,7 +135,7 @@ public class NamespaceService implements AutoCloseable {
     public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)");
     public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s";
     public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s";
-    public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s:%s";
+    public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s";
 
     private final ConcurrentOpenHashMap<ClusterDataImpl, PulsarClientImpl> namespaceClients;
 
@@ -189,7 +189,7 @@ public class NamespaceService implements AutoCloseable {
         CompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic)
                 .thenCompose(bundle -> {
                     // Do redirection if the cluster is in rollback or deploying.
-                    return redirectManager.findRedirectLookupResultAsync().thenCompose(optResult -> {
+                    return findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {
                         if (optResult.isPresent()) {
                             LOG.info("[{}] Redirect lookup request to {} for topic {}",
                                     pulsar.getSafeWebServiceAddress(), optResult.get(), topic);
@@ -221,6 +221,13 @@ public class NamespaceService implements AutoCloseable {
         return future;
     }
 
+    private CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync(ServiceUnitId bundle) {
+        if (isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
+            return CompletableFuture.completedFuture(Optional.empty());
+        }
+        return redirectManager.findRedirectLookupResultAsync();
+    }
+
     public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {
         return bundleFactory.getBundlesAsync(topic.getNamespaceObject())
                 .thenApply(bundles -> bundles.findBundle(topic));
@@ -288,8 +295,7 @@ public class NamespaceService implements AutoCloseable {
     private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(@Nullable ServiceUnitId topic,
                                                                       NamespaceBundle bundle,
                                                                       LookupOptions options) {
-
-        return redirectManager.findRedirectLookupResultAsync().thenCompose(optResult -> {
+        return findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {
             if (optResult.isPresent()) {
                 LOG.info("[{}] Redirect lookup request to {} for topic {}",
                         pulsar.getSafeWebServiceAddress(), optResult.get(), topic);
@@ -695,7 +701,7 @@ public class NamespaceService implements AutoCloseable {
         return lookupFuture;
     }
 
-    private boolean isBrokerActive(String candidateBroker) {
+    public boolean isBrokerActive(String candidateBroker) {
         String candidateBrokerHostAndPort = parseHostAndPort(candidateBroker);
         Set<String> availableBrokers = getAvailableBrokers();
         if (availableBrokers.contains(candidateBrokerHostAndPort)) {
@@ -1564,7 +1570,7 @@ public class NamespaceService implements AutoCloseable {
     }
 
     public void unloadSLANamespace() throws Exception {
-        NamespaceName namespaceName = getSLAMonitorNamespace(host, config);
+        NamespaceName namespaceName = getSLAMonitorNamespace(pulsar.getLookupServiceAddress(), config);
 
         LOG.info("Checking owner for SLA namespace {}", namespaceName);
 
@@ -1589,14 +1595,8 @@ public class NamespaceService implements AutoCloseable {
         return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, lookupBroker));
     }
 
-    public static NamespaceName getSLAMonitorNamespace(String host, ServiceConfiguration config) {
-        Integer port = null;
-        if (config.getWebServicePort().isPresent()) {
-            port = config.getWebServicePort().get();
-        } else if (config.getWebServicePortTls().isPresent()) {
-            port = config.getWebServicePortTls().get();
-        }
-        return NamespaceName.get(String.format(SLA_NAMESPACE_FMT, config.getClusterName(), host, port));
+    public static NamespaceName getSLAMonitorNamespace(String lookupBroker, ServiceConfiguration config) {
+        return NamespaceName.get(String.format(SLA_NAMESPACE_FMT, config.getClusterName(), lookupBroker));
     }
 
     public static String checkHeartbeatNamespace(ServiceUnitId ns) {
@@ -1640,7 +1640,7 @@ public class NamespaceService implements AutoCloseable {
      * @param namespace the namespace name
      * @return True if namespace is HEARTBEAT_NAMESPACE or SLA_NAMESPACE
      */
-    public static boolean filterNamespaceForShedding(String namespace) {
+    public static boolean isSLAOrHeartbeatNamespace(String namespace) {
         return SLA_NAMESPACE_PATTERN.matcher(namespace).matches()
                 || HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
                 || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches();
@@ -1653,14 +1653,16 @@ public class NamespaceService implements AutoCloseable {
     }
 
     public boolean registerSLANamespace() throws PulsarServerException {
-        boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(host, config), false);
+        String lookupServiceAddress = pulsar.getLookupServiceAddress();
+        boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(lookupServiceAddress, config), false);
         if (isNameSpaceRegistered) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Added SLA Monitoring namespace name in local cache: ns={}",
-                        getSLAMonitorNamespace(host, config));
+                        getSLAMonitorNamespace(lookupServiceAddress, config));
             }
         } else if (LOG.isDebugEnabled()) {
-            LOG.debug("SLA Monitoring not owned by the broker: ns={}", getSLAMonitorNamespace(host, config));
+            LOG.debug("SLA Monitoring not owned by the broker: ns={}",
+                    getSLAMonitorNamespace(lookupServiceAddress, config));
         }
         return isNameSpaceRegistered;
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 9ce57a88540..011e7174cbe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -35,6 +35,9 @@ import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecis
 import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded;
 import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded;
 import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
+import static org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespace;
+import static org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespaceV2;
+import static org.apache.pulsar.broker.namespace.NamespaceService.getSLAMonitorNamespace;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -594,6 +597,18 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
             assertTrue(webServiceUrl3.isPresent());
             assertEquals(webServiceUrl3.get().toString(), webServiceUrl1.get().toString());
 
+            List<PulsarService> pulsarServices = List.of(pulsar1, pulsar2, pulsar3);
+            for (PulsarService pulsarService : pulsarServices) {
+                // Test lookup heartbeat namespace's topic
+                for (PulsarService pulsar : pulsarServices) {
+                    assertLookupHeartbeatOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+                }
+                // Test lookup SLA namespace's topic
+                for (PulsarService pulsar : pulsarServices) {
+                    assertLookupSLANamespaceOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+                }
+            }
+
             // Test deploy new broker with new load manager
             ServiceConfiguration conf = getDefaultConf();
             conf.setAllowAutoTopicCreation(true);
@@ -642,10 +657,48 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
                 assertTrue(webServiceUrl4.isPresent());
                 assertEquals(webServiceUrl4.get().toString(), webServiceUrl1.get().toString());
 
+                pulsarServices = List.of(pulsar1, pulsar2, pulsar3, pulsar4);
+                for (PulsarService pulsarService : pulsarServices) {
+                    // Test lookup heartbeat namespace's topic
+                    for (PulsarService pulsar : pulsarServices) {
+                        assertLookupHeartbeatOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+                    }
+                    // Test lookup SLA namespace's topic
+                    for (PulsarService pulsar : pulsarServices) {
+                        assertLookupSLANamespaceOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+                    }
+                }
             }
         }
     }
 
+    private void assertLookupHeartbeatOwner(PulsarService pulsar,
+                                            String lookupServiceAddress,
+                                            String expectedBrokerServiceUrl) throws Exception {
+        NamespaceName heartbeatNamespaceV1 =
+                getHeartbeatNamespace(lookupServiceAddress, pulsar.getConfiguration());
+
+        String heartbeatV1Topic = heartbeatNamespaceV1.getPersistentTopicName("test");
+        assertEquals(pulsar.getAdminClient().lookups().lookupTopic(heartbeatV1Topic), expectedBrokerServiceUrl);
+
+        NamespaceName heartbeatNamespaceV2 =
+                getHeartbeatNamespaceV2(lookupServiceAddress, pulsar.getConfiguration());
+
+        String heartbeatV2Topic = heartbeatNamespaceV2.getPersistentTopicName("test");
+        assertEquals(pulsar.getAdminClient().lookups().lookupTopic(heartbeatV2Topic), expectedBrokerServiceUrl);
+    }
+
+    private void assertLookupSLANamespaceOwner(PulsarService pulsar,
+                                               String lookupServiceAddress,
+                                               String expectedBrokerServiceUrl) throws Exception {
+        NamespaceName slaMonitorNamespace = getSLAMonitorNamespace(lookupServiceAddress, pulsar.getConfiguration());
+        String slaMonitorTopic = slaMonitorNamespace.getPersistentTopicName("test");
+        String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+        log.info("Topic {} Lookup result: {}", slaMonitorTopic, result);
+        assertNotNull(result);
+        assertEquals(result, expectedBrokerServiceUrl);
+    }
+
     @Test
     public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws Exception {
         var topBundlesLoadDataStorePrimary =
@@ -1043,15 +1096,15 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
         admin.namespaces().deleteNamespace(namespace, true);
     }
 
-    @Test(timeOut = 30 * 1000)
+    @Test(timeOut = 30 * 1000, priority = -1)
     public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws Exception {
         NamespaceName heartbeatNamespacePulsar1V1 =
-                NamespaceService.getHeartbeatNamespace(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration());
+                getHeartbeatNamespace(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration());
         NamespaceName heartbeatNamespacePulsar1V2 =
                 NamespaceService.getHeartbeatNamespaceV2(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration());
 
         NamespaceName heartbeatNamespacePulsar2V1 =
-                NamespaceService.getHeartbeatNamespace(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration());
+                getHeartbeatNamespace(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration());
         NamespaceName heartbeatNamespacePulsar2V2 =
                 NamespaceService.getHeartbeatNamespaceV2(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration());
 
@@ -1068,22 +1121,22 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
         Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnits();
         log.info("Owned service units: {}", ownedServiceUnitsByPulsar1);
         // heartbeat namespace bundle will own by pulsar1
-        assertEquals(ownedServiceUnitsByPulsar1.size(), 2);
+        assertEquals(ownedServiceUnitsByPulsar1.size(), 3);
         assertTrue(ownedServiceUnitsByPulsar1.contains(bundle1));
         assertTrue(ownedServiceUnitsByPulsar1.contains(bundle2));
         Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnits();
         log.info("Owned service units: {}", ownedServiceUnitsByPulsar2);
-        assertEquals(ownedServiceUnitsByPulsar2.size(), 2);
+        assertEquals(ownedServiceUnitsByPulsar2.size(), 3);
         assertTrue(ownedServiceUnitsByPulsar2.contains(bundle3));
         assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4));
         Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar1 =
                 admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar1.getLookupServiceAddress());
         Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar2 =
                 admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar2.getLookupServiceAddress());
-        assertEquals(ownedNamespacesByPulsar1.size(), 2);
+        assertEquals(ownedNamespacesByPulsar1.size(), 3);
         assertTrue(ownedNamespacesByPulsar1.containsKey(bundle1.toString()));
         assertTrue(ownedNamespacesByPulsar1.containsKey(bundle2.toString()));
-        assertEquals(ownedNamespacesByPulsar2.size(), 2);
+        assertEquals(ownedNamespacesByPulsar2.size(), 3);
         assertTrue(ownedNamespacesByPulsar2.containsKey(bundle3.toString()));
         assertTrue(ownedNamespacesByPulsar2.containsKey(bundle4.toString()));
 
@@ -1134,7 +1187,8 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
         String topic = "persistent://" + namespace + "/test";
         NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get();
         NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get();
-        assertEquals(namespaceEphemeralData.getNativeUrl(), pulsar1.getBrokerServiceUrl());
+        assertTrue(Set.of(pulsar1.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl())
+                .contains(namespaceEphemeralData.getNativeUrl()));
         admin.namespaces().deleteNamespace(namespace, true);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index de21842f458..a226df53e12 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -30,8 +30,6 @@ import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
 import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Unload;
 import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MAX_CLEAN_UP_DELAY_TIME_IN_SECS;
 import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT;
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT_V2;
 import static org.apache.pulsar.metadata.api.extended.SessionEvent.ConnectionLost;
 import static org.apache.pulsar.metadata.api.extended.SessionEvent.Reconnected;
 import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
@@ -89,7 +87,6 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.TableViewImpl;
-import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -639,7 +636,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
         validateMonitorCounters(leader,
                 0,
-                1,
+                3,
                 0,
                 0,
                 0,
@@ -756,34 +753,6 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         waitUntilNewOwner(channel1, bundle2, broker);
         waitUntilNewOwner(channel2, bundle2, broker);
 
-        // Register the broker-1 heartbeat namespace bundle.
-        String heartbeatNamespaceBroker1V1 = NamespaceName
-                .get(String.format(HEARTBEAT_NAMESPACE_FMT, conf.getClusterName(), broker)).toString();
-        String heartbeatNamespaceBroker1V2 = NamespaceName
-                .get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, broker)).toString();
-        String heartbeatNamespaceBroker1V1Bundle = heartbeatNamespaceBroker1V1 + "/0x00000000_0xfffffff0";
-        String heartbeatNamespaceBroker1V2Bundle = heartbeatNamespaceBroker1V2 + "/0x00000000_0xfffffff0";
-        channel1.publishAssignEventAsync(heartbeatNamespaceBroker1V1Bundle, broker);
-        channel1.publishAssignEventAsync(heartbeatNamespaceBroker1V2Bundle, broker);
-
-        // Register the broker-2 heartbeat namespace bundle.
-        String heartbeatNamespaceBroker2V1 = NamespaceName
-                .get(String.format(HEARTBEAT_NAMESPACE_FMT, conf.getClusterName(), lookupServiceAddress2)).toString();
-        String heartbeatNamespaceBroker2V2 = NamespaceName
-                .get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, lookupServiceAddress2)).toString();
-        String heartbeatNamespaceBroker2V1Bundle = heartbeatNamespaceBroker2V1 + "/0x00000000_0xfffffff0";
-        String heartbeatNamespaceBroker2V2Bundle = heartbeatNamespaceBroker2V2 + "/0x00000000_0xfffffff0";
-        channel1.publishAssignEventAsync(heartbeatNamespaceBroker2V1Bundle, lookupServiceAddress2);
-        channel1.publishAssignEventAsync(heartbeatNamespaceBroker2V2Bundle, lookupServiceAddress2);
-        waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V1Bundle, broker);
-        waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V2Bundle, broker);
-        waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V1Bundle, broker);
-        waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V2Bundle, broker);
-        waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V1Bundle, lookupServiceAddress2);
-        waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V2Bundle, lookupServiceAddress2);
-        waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V1Bundle, lookupServiceAddress2);
-        waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V2Bundle, lookupServiceAddress2);
-
         // Verify to transfer the ownership to the other broker.
         channel1.publishUnloadEventAsync(new Unload(broker, bundle1, Optional.of(lookupServiceAddress2)));
         waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2);
@@ -806,16 +775,6 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         waitUntilNewOwner(channel1, bundle2, lookupServiceAddress2);
         waitUntilNewOwner(channel2, bundle2, lookupServiceAddress2);
 
-        waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V1Bundle, null);
-        waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V2Bundle, null);
-        waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V1Bundle, null);
-        waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V2Bundle, null);
-
-        waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V1Bundle, null);
-        waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V2Bundle, null);
-        waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V1Bundle, null);
-        waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V2Bundle, null);
-
         verify(leaderCleanupJobs, times(1)).computeIfAbsent(eq(broker), any());
         verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any());
 
@@ -827,7 +786,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         validateMonitorCounters(leaderChannel,
                 2,
                 0,
-                7,
+                3,
                 0,
                 2,
                 0,
@@ -858,7 +817,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         validateMonitorCounters(leaderChannel,
                 2,
                 0,
-                7,
+                3,
                 0,
                 3,
                 0,
@@ -879,7 +838,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         validateMonitorCounters(leaderChannel,
                 2,
                 0,
-                7,
+                3,
                 0,
                 3,
                 0,
@@ -901,7 +860,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         validateMonitorCounters(leaderChannel,
                 2,
                 0,
-                7,
+                3,
                 0,
                 4,
                 0,
@@ -923,7 +882,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         validateMonitorCounters(leaderChannel,
                 3,
                 0,
-                9,
+                5,
                 0,
                 4,
                 0,
@@ -952,7 +911,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         validateMonitorCounters(leaderChannel,
                 3,
                 0,
-                9,
+                5,
                 0,
                 4,
                 1,
@@ -1447,7 +1406,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
 
         validateMonitorCounters(leader,
                 0,
-                1,
+                3,
                 1,
                 0,
                 0,