You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/10/24 14:31:29 UTC

[pulsar] branch branch-3.0 updated: [fix][broker] Fix unload operation stuck when use ExtensibleLoadManager (#21332)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c6bffc2605d [fix][broker] Fix unload operation stuck when use ExtensibleLoadManager (#21332)
c6bffc2605d is described below

commit c6bffc2605d05c94eaca20d23502fb368926087f
Author: Kai Wang <kw...@apache.org>
AuthorDate: Thu Oct 19 07:08:49 2023 -0500

    [fix][broker] Fix unload operation stuck when use ExtensibleLoadManager (#21332)
---
 .../extensions/ExtensibleLoadManagerImpl.java      |   2 +-
 .../channel/ServiceUnitStateChannelImpl.java       |  17 +-
 .../extensions/manager/UnloadManager.java          |   7 +
 .../pulsar/broker/namespace/NamespaceService.java  |   4 -
 .../pulsar/broker/service/BrokerService.java       |  15 +
 .../extensions/ExtensibleLoadManagerImplTest.java  | 311 ++++++++++++---------
 .../channel/ServiceUnitStateChannelTest.java       |  12 +-
 7 files changed, 219 insertions(+), 149 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 85baf9ec4fb..d3119365ddf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -304,7 +304,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
                             }
                         });
                     });
-            this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
+            this.serviceUnitStateChannel = ServiceUnitStateChannelImpl.newInstance(pulsar);
             this.brokerRegistry.start();
             this.splitManager = new SplitManager(splitCounter);
             this.unloadManager = new UnloadManager(unloadCounter);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 02acb923c2d..68501d201f0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -200,7 +200,18 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
         Unstable
     }
 
+    public static ServiceUnitStateChannelImpl newInstance(PulsarService pulsar) {
+        return new ServiceUnitStateChannelImpl(pulsar);
+    }
+
     public ServiceUnitStateChannelImpl(PulsarService pulsar) {
+        this(pulsar, MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS, OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS);
+    }
+
+    @VisibleForTesting
+    public ServiceUnitStateChannelImpl(PulsarService pulsar,
+                                       long inFlightStateWaitingTimeInMillis,
+                                       long ownershipMonitorDelayTimeInSecs) {
         this.pulsar = pulsar;
         this.config = pulsar.getConfig();
         this.lookupServiceAddress = pulsar.getLookupServiceAddress();
@@ -210,8 +221,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
         this.stateChangeListeners = new StateChangeListeners();
         this.semiTerminalStateWaitingTimeInMillis = config.getLoadBalancerServiceUnitStateTombstoneDelayTimeInSeconds()
                 * 1000;
-        this.inFlightStateWaitingTimeInMillis = MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS;
-        this.ownershipMonitorDelayTimeInSecs = OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS;
+        this.inFlightStateWaitingTimeInMillis = inFlightStateWaitingTimeInMillis;
+        this.ownershipMonitorDelayTimeInSecs = ownershipMonitorDelayTimeInSecs;
         if (semiTerminalStateWaitingTimeInMillis < inFlightStateWaitingTimeInMillis) {
             throw new IllegalArgumentException(
                     "Invalid Config: loadBalancerServiceUnitStateCleanUpDelayTimeInSeconds < "
@@ -837,7 +848,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
         } finally {
             var future = requested.getValue();
             if (future != null) {
-                future.orTimeout(inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS)
+                future.orTimeout(inFlightStateWaitingTimeInMillis + 5 * 1000, TimeUnit.MILLISECONDS)
                         .whenComplete((v, e) -> {
                                     if (e != null) {
                                         getOwnerRequests.remove(serviceUnit, future);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
index 2dde0c4708e..ffdbbc2af42 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
@@ -88,6 +88,13 @@ public class UnloadManager implements StateChangeListener {
 
     @Override
     public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
+        if (t != null && inFlightUnloadRequest.containsKey(serviceUnit)) {
+            if (log.isDebugEnabled()) {
+                log.debug("Handling {} for service unit {} with exception.", data, serviceUnit, t);
+            }
+            this.complete(serviceUnit, t);
+            return;
+        }
         ServiceUnitState state = ServiceUnitStateData.state(data);
         switch (state) {
             case Free, Owned -> this.complete(serviceUnit, t);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 0d69fd7ea78..4c2b4211747 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1544,10 +1544,6 @@ public class NamespaceService implements AutoCloseable {
 
     public CompletableFuture<Boolean> checkOwnershipPresentAsync(NamespaceBundle bundle) {
         if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
-            if (bundle.getNamespaceObject().equals(SYSTEM_NAMESPACE)) {
-                return FutureUtil.failedFuture(new UnsupportedOperationException(
-                        "Ownership check for system namespace is not supported"));
-            }
             ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
             return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle)
                     .thenApply(Optional::isPresent);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7c7dec864e2..b5bd7bfaa3d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2202,6 +2202,21 @@ public class BrokerService implements Closeable {
             if (serviceUnit.includes(topicName)) {
                 // Topic needs to be unloaded
                 log.info("[{}] Unloading topic", topicName);
+                if (topicFuture.isCompletedExceptionally()) {
+                    try {
+                        topicFuture.get();
+                    } catch (InterruptedException | ExecutionException ex) {
+                        if (ex.getCause() instanceof ServiceUnitNotReadyException) {
+                            // Topic was already unloaded
+                            if (log.isDebugEnabled()) {
+                                log.debug("[{}] Topic was already unloaded", topicName);
+                            }
+                            return;
+                        } else {
+                            log.warn("[{}] Got exception when closing topic", topicName, ex);
+                        }
+                    }
+                }
                 closeFutures.add(topicFuture
                         .thenCompose(t -> t.isPresent() ? t.get().close(closeWithoutWaitingClientDisconnect)
                                 : CompletableFuture.completedFuture(null)));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 2d9de4b5e7f..f499998fd3d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -38,9 +38,11 @@ import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecis
 import static org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespace;
 import static org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespaceV2;
 import static org.apache.pulsar.broker.namespace.NamespaceService.getSLAMonitorNamespace;
+import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -111,7 +113,8 @@ import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.awaitility.Awaitility;
+import org.mockito.MockedStatic;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -141,46 +144,56 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
     @BeforeClass
     @Override
     public void setup() throws Exception {
-        conf.setForceDeleteNamespaceAllowed(true);
-        conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
-        conf.setAllowAutoTopicCreation(true);
-        conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
-        conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
-        conf.setLoadBalancerSheddingEnabled(false);
-        conf.setLoadBalancerDebugModeEnabled(true);
-        conf.setTopicLevelPoliciesEnabled(false);
-        super.internalSetup(conf);
-        pulsar1 = pulsar;
-        ServiceConfiguration defaultConf = getDefaultConf();
-        defaultConf.setAllowAutoTopicCreation(true);
-        defaultConf.setForceDeleteNamespaceAllowed(true);
-        defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
-        defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
-        defaultConf.setLoadBalancerSheddingEnabled(false);
-        defaultConf.setTopicLevelPoliciesEnabled(false);
-        additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf);
-        pulsar2 = additionalPulsarTestContext.getPulsarService();
-
-        setPrimaryLoadManager();
-
-        setSecondaryLoadManager();
-
-        admin.clusters().createCluster(this.conf.getClusterName(),
-                ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
-        admin.tenants().createTenant("public",
-                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
-                        Sets.newHashSet(this.conf.getClusterName())));
-        admin.namespaces().createNamespace("public/default");
-        admin.namespaces().setNamespaceReplicationClusters("public/default",
-                Sets.newHashSet(this.conf.getClusterName()));
-
-        admin.namespaces().createNamespace(defaultTestNamespace);
-        admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
-                Sets.newHashSet(this.conf.getClusterName()));
+        try (MockedStatic<ServiceUnitStateChannelImpl> channelMockedStatic =
+                     mockStatic(ServiceUnitStateChannelImpl.class)) {
+            channelMockedStatic.when(() -> ServiceUnitStateChannelImpl.newInstance(isA(PulsarService.class)))
+                    .thenAnswer(invocation -> {
+                        PulsarService pulsarService = invocation.getArgument(0);
+                        // Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid
+                        // stuck when doing unload.
+                        return new ServiceUnitStateChannelImpl(pulsarService, 5 * 1000, 1);
+                    });
+            conf.setForceDeleteNamespaceAllowed(true);
+            conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+            conf.setAllowAutoTopicCreation(true);
+            conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+            conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+            conf.setLoadBalancerSheddingEnabled(false);
+            conf.setLoadBalancerDebugModeEnabled(true);
+            conf.setTopicLevelPoliciesEnabled(true);
+            super.internalSetup(conf);
+            pulsar1 = pulsar;
+            ServiceConfiguration defaultConf = getDefaultConf();
+            defaultConf.setAllowAutoTopicCreation(true);
+            defaultConf.setForceDeleteNamespaceAllowed(true);
+            defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+            defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+            defaultConf.setLoadBalancerSheddingEnabled(false);
+            defaultConf.setTopicLevelPoliciesEnabled(true);
+            additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf);
+            pulsar2 = additionalPulsarTestContext.getPulsarService();
+
+            setPrimaryLoadManager();
+
+            setSecondaryLoadManager();
+
+            admin.clusters().createCluster(this.conf.getClusterName(),
+                    ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+            admin.tenants().createTenant("public",
+                    new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
+                            Sets.newHashSet(this.conf.getClusterName())));
+            admin.namespaces().createNamespace("public/default");
+            admin.namespaces().setNamespaceReplicationClusters("public/default",
+                    Sets.newHashSet(this.conf.getClusterName()));
+
+            admin.namespaces().createNamespace(defaultTestNamespace);
+            admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
+                    Sets.newHashSet(this.conf.getClusterName()));
+        }
     }
 
     @Override
-    @AfterClass
+    @AfterClass(alwaysRun = true)
     protected void cleanup() throws Exception {
         pulsar1 = null;
         pulsar2.close();
@@ -538,119 +551,134 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void testDeployAndRollbackLoadManager() throws Exception {
-        // Test rollback to modular load manager.
-        ServiceConfiguration defaultConf = getDefaultConf();
-        defaultConf.setAllowAutoTopicCreation(true);
-        defaultConf.setForceDeleteNamespaceAllowed(true);
-        defaultConf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
-        defaultConf.setLoadBalancerSheddingEnabled(false);
-        try (var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf)) {
-            // start pulsar3 with old load manager
-            var pulsar3 = additionalPulsarTestContext.getPulsarService();
-            String topic = "persistent://" + defaultTestNamespace + "/test";
-
-            String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic);
-            assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl());
-
-            String lookupResult2 = pulsar1.getAdminClient().lookups().lookupTopic(topic);
-            String lookupResult3 = pulsar2.getAdminClient().lookups().lookupTopic(topic);
-            assertEquals(lookupResult1, lookupResult2);
-            assertEquals(lookupResult1, lookupResult3);
-
-            NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get();
-            LookupOptions options = LookupOptions.builder()
-                    .authoritative(false)
-                    .requestHttps(false)
-                    .readOnly(false)
-                    .loadTopicsInBundle(false).build();
-            Optional<URL> webServiceUrl1 =
-                    pulsar1.getNamespaceService().getWebServiceUrl(bundle, options);
-            assertTrue(webServiceUrl1.isPresent());
-            assertEquals(webServiceUrl1.get().toString(), pulsar3.getWebServiceAddress());
-
-            Optional<URL> webServiceUrl2 =
-                    pulsar2.getNamespaceService().getWebServiceUrl(bundle, options);
-            assertTrue(webServiceUrl2.isPresent());
-            assertEquals(webServiceUrl2.get().toString(), webServiceUrl1.get().toString());
-
-            Optional<URL> webServiceUrl3 =
-                    pulsar3.getNamespaceService().getWebServiceUrl(bundle, options);
-            assertTrue(webServiceUrl3.isPresent());
-            assertEquals(webServiceUrl3.get().toString(), webServiceUrl1.get().toString());
-
-            List<PulsarService> pulsarServices = List.of(pulsar1, pulsar2, pulsar3);
-            for (PulsarService pulsarService : pulsarServices) {
-                // Test lookup heartbeat namespace's topic
-                for (PulsarService pulsar : pulsarServices) {
-                    assertLookupHeartbeatOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
-                }
-                // Test lookup SLA namespace's topic
-                for (PulsarService pulsar : pulsarServices) {
-                    assertLookupSLANamespaceOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
-                }
-            }
-
-            // Test deploy new broker with new load manager
-            ServiceConfiguration conf = getDefaultConf();
-            conf.setAllowAutoTopicCreation(true);
-            conf.setForceDeleteNamespaceAllowed(true);
-            conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
-            conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
-            try (var additionPulsarTestContext = createAdditionalPulsarTestContext(conf)) {
-                var pulsar4 = additionPulsarTestContext.getPulsarService();
-
-                Set<String> availableCandidates = Sets.newHashSet(pulsar1.getBrokerServiceUrl(),
-                        pulsar2.getBrokerServiceUrl(),
-                        pulsar4.getBrokerServiceUrl());
-                String lookupResult4 = pulsar4.getAdminClient().lookups().lookupTopic(topic);
-                assertTrue(availableCandidates.contains(lookupResult4));
-
-                String lookupResult5 = pulsar1.getAdminClient().lookups().lookupTopic(topic);
-                String lookupResult6 = pulsar2.getAdminClient().lookups().lookupTopic(topic);
-                String lookupResult7 = pulsar3.getAdminClient().lookups().lookupTopic(topic);
-                assertEquals(lookupResult4, lookupResult5);
-                assertEquals(lookupResult4, lookupResult6);
-                assertEquals(lookupResult4, lookupResult7);
-
-                Set<String> availableWebUrlCandidates = Sets.newHashSet(pulsar1.getWebServiceAddress(),
-                        pulsar2.getWebServiceAddress(),
-                        pulsar4.getWebServiceAddress());
-
-                webServiceUrl1 =
+        try (MockedStatic<ServiceUnitStateChannelImpl> channelMockedStatic =
+                     mockStatic(ServiceUnitStateChannelImpl.class)) {
+            channelMockedStatic.when(() -> ServiceUnitStateChannelImpl.newInstance(isA(PulsarService.class)))
+                    .thenAnswer(invocation -> {
+                        PulsarService pulsarService = invocation.getArgument(0);
+                        // Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid
+                        // stuck when doing unload.
+                        return new ServiceUnitStateChannelImpl(pulsarService, 5 * 1000, 1);
+                    });
+            // Test rollback to modular load manager.
+            ServiceConfiguration defaultConf = getDefaultConf();
+            defaultConf.setAllowAutoTopicCreation(true);
+            defaultConf.setForceDeleteNamespaceAllowed(true);
+            defaultConf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+            defaultConf.setLoadBalancerSheddingEnabled(false);
+            try (var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf)) {
+                // start pulsar3 with old load manager
+                var pulsar3 = additionalPulsarTestContext.getPulsarService();
+                String topic = "persistent://" + defaultTestNamespace + "/test";
+
+                String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic);
+                assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl());
+
+                String lookupResult2 = pulsar1.getAdminClient().lookups().lookupTopic(topic);
+                String lookupResult3 = pulsar2.getAdminClient().lookups().lookupTopic(topic);
+                assertEquals(lookupResult1, lookupResult2);
+                assertEquals(lookupResult1, lookupResult3);
+
+                NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get();
+                LookupOptions options = LookupOptions.builder()
+                        .authoritative(false)
+                        .requestHttps(false)
+                        .readOnly(false)
+                        .loadTopicsInBundle(false).build();
+                Optional<URL> webServiceUrl1 =
                         pulsar1.getNamespaceService().getWebServiceUrl(bundle, options);
                 assertTrue(webServiceUrl1.isPresent());
-                assertTrue(availableWebUrlCandidates.contains(webServiceUrl1.get().toString()));
+                assertEquals(webServiceUrl1.get().toString(), pulsar3.getWebServiceAddress());
 
-                webServiceUrl2 =
+                Optional<URL> webServiceUrl2 =
                         pulsar2.getNamespaceService().getWebServiceUrl(bundle, options);
                 assertTrue(webServiceUrl2.isPresent());
                 assertEquals(webServiceUrl2.get().toString(), webServiceUrl1.get().toString());
 
-                // The pulsar3 will redirect to pulsar4
-                webServiceUrl3 =
+                Optional<URL> webServiceUrl3 =
                         pulsar3.getNamespaceService().getWebServiceUrl(bundle, options);
                 assertTrue(webServiceUrl3.isPresent());
-                // It will redirect to pulsar4
-                assertTrue(availableWebUrlCandidates.contains(webServiceUrl3.get().toString()));
-
-                var webServiceUrl4 =
-                        pulsar4.getNamespaceService().getWebServiceUrl(bundle, options);
-                assertTrue(webServiceUrl4.isPresent());
-                assertEquals(webServiceUrl4.get().toString(), webServiceUrl1.get().toString());
+                assertEquals(webServiceUrl3.get().toString(), webServiceUrl1.get().toString());
 
-                pulsarServices = List.of(pulsar1, pulsar2, pulsar3, pulsar4);
+                List<PulsarService> pulsarServices = List.of(pulsar1, pulsar2, pulsar3);
                 for (PulsarService pulsarService : pulsarServices) {
                     // Test lookup heartbeat namespace's topic
                     for (PulsarService pulsar : pulsarServices) {
-                        assertLookupHeartbeatOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+                        assertLookupHeartbeatOwner(pulsarService,
+                                pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
                     }
                     // Test lookup SLA namespace's topic
                     for (PulsarService pulsar : pulsarServices) {
-                        assertLookupSLANamespaceOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+                        assertLookupSLANamespaceOwner(pulsarService,
+                                pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+                    }
+                }
+
+                // Test deploy new broker with new load manager
+                ServiceConfiguration conf = getDefaultConf();
+                conf.setAllowAutoTopicCreation(true);
+                conf.setForceDeleteNamespaceAllowed(true);
+                conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+                conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+                try (var additionPulsarTestContext = createAdditionalPulsarTestContext(conf)) {
+                    var pulsar4 = additionPulsarTestContext.getPulsarService();
+
+                    Set<String> availableCandidates = Sets.newHashSet(pulsar1.getBrokerServiceUrl(),
+                            pulsar2.getBrokerServiceUrl(),
+                            pulsar4.getBrokerServiceUrl());
+                    String lookupResult4 = pulsar4.getAdminClient().lookups().lookupTopic(topic);
+                    assertTrue(availableCandidates.contains(lookupResult4));
+
+                    String lookupResult5 = pulsar1.getAdminClient().lookups().lookupTopic(topic);
+                    String lookupResult6 = pulsar2.getAdminClient().lookups().lookupTopic(topic);
+                    String lookupResult7 = pulsar3.getAdminClient().lookups().lookupTopic(topic);
+                    assertEquals(lookupResult4, lookupResult5);
+                    assertEquals(lookupResult4, lookupResult6);
+                    assertEquals(lookupResult4, lookupResult7);
+
+                    Set<String> availableWebUrlCandidates = Sets.newHashSet(pulsar1.getWebServiceAddress(),
+                            pulsar2.getWebServiceAddress(),
+                            pulsar4.getWebServiceAddress());
+
+                    webServiceUrl1 =
+                            pulsar1.getNamespaceService().getWebServiceUrl(bundle, options);
+                    assertTrue(webServiceUrl1.isPresent());
+                    assertTrue(availableWebUrlCandidates.contains(webServiceUrl1.get().toString()));
+
+                    webServiceUrl2 =
+                            pulsar2.getNamespaceService().getWebServiceUrl(bundle, options);
+                    assertTrue(webServiceUrl2.isPresent());
+                    assertEquals(webServiceUrl2.get().toString(), webServiceUrl1.get().toString());
+
+                    // The pulsar3 will redirect to pulsar4
+                    webServiceUrl3 =
+                            pulsar3.getNamespaceService().getWebServiceUrl(bundle, options);
+                    assertTrue(webServiceUrl3.isPresent());
+                    // It will redirect to pulsar4
+                    assertTrue(availableWebUrlCandidates.contains(webServiceUrl3.get().toString()));
+
+                    var webServiceUrl4 =
+                            pulsar4.getNamespaceService().getWebServiceUrl(bundle, options);
+                    assertTrue(webServiceUrl4.isPresent());
+                    assertEquals(webServiceUrl4.get().toString(), webServiceUrl1.get().toString());
+
+                    pulsarServices = List.of(pulsar1, pulsar2, pulsar3, pulsar4);
+                    for (PulsarService pulsarService : pulsarServices) {
+                        // Test lookup heartbeat namespace's topic
+                        for (PulsarService pulsar : pulsarServices) {
+                            assertLookupHeartbeatOwner(pulsarService,
+                                    pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+                        }
+                        // Test lookup SLA namespace's topic
+                        for (PulsarService pulsar : pulsarServices) {
+                            assertLookupSLANamespaceOwner(pulsarService,
+                                    pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+                        }
                     }
                 }
             }
         }
+
     }
 
     private void assertLookupHeartbeatOwner(PulsarService pulsar,
@@ -1089,6 +1117,12 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
         NamespaceName heartbeatNamespacePulsar2V2 =
                 NamespaceService.getHeartbeatNamespaceV2(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration());
 
+        NamespaceName slaMonitorNamespacePulsar1 =
+                getSLAMonitorNamespace(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration());
+
+        NamespaceName slaMonitorNamespacePulsar2 =
+                getSLAMonitorNamespace(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration());
+
         NamespaceBundle bundle1 = pulsar1.getNamespaceService().getNamespaceBundleFactory()
                 .getFullBundle(heartbeatNamespacePulsar1V1);
         NamespaceBundle bundle2 = pulsar1.getNamespaceService().getNamespaceBundleFactory()
@@ -1099,27 +1133,34 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
         NamespaceBundle bundle4 = pulsar2.getNamespaceService().getNamespaceBundleFactory()
                 .getFullBundle(heartbeatNamespacePulsar2V2);
 
+        NamespaceBundle slaBundle1 = pulsar1.getNamespaceService().getNamespaceBundleFactory()
+                .getFullBundle(slaMonitorNamespacePulsar1);
+        NamespaceBundle slaBundle2 = pulsar2.getNamespaceService().getNamespaceBundleFactory()
+                .getFullBundle(slaMonitorNamespacePulsar2);
+
+
         Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnits();
         log.info("Owned service units: {}", ownedServiceUnitsByPulsar1);
         // heartbeat namespace bundle will own by pulsar1
-        assertEquals(ownedServiceUnitsByPulsar1.size(), 3);
         assertTrue(ownedServiceUnitsByPulsar1.contains(bundle1));
         assertTrue(ownedServiceUnitsByPulsar1.contains(bundle2));
+        assertTrue(ownedServiceUnitsByPulsar1.contains(slaBundle1));
         Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnits();
         log.info("Owned service units: {}", ownedServiceUnitsByPulsar2);
-        assertEquals(ownedServiceUnitsByPulsar2.size(), 3);
         assertTrue(ownedServiceUnitsByPulsar2.contains(bundle3));
         assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4));
+        assertTrue(ownedServiceUnitsByPulsar2.contains(slaBundle2));
         Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar1 =
                 admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar1.getLookupServiceAddress());
         Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar2 =
                 admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar2.getLookupServiceAddress());
-        assertEquals(ownedNamespacesByPulsar1.size(), 3);
         assertTrue(ownedNamespacesByPulsar1.containsKey(bundle1.toString()));
         assertTrue(ownedNamespacesByPulsar1.containsKey(bundle2.toString()));
-        assertEquals(ownedNamespacesByPulsar2.size(), 3);
+        assertTrue(ownedNamespacesByPulsar1.containsKey(slaBundle1.toString()));
+
         assertTrue(ownedNamespacesByPulsar2.containsKey(bundle3.toString()));
         assertTrue(ownedNamespacesByPulsar2.containsKey(bundle4.toString()));
+        assertTrue(ownedNamespacesByPulsar2.containsKey(slaBundle2.toString()));
 
         String topic = "persistent://" + defaultTestNamespace + "/test-get-owned-service-units";
         admin.topics().createPartitionedTopic(topic, 1);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 990408d214b..e8ccd7b01ca 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -507,10 +507,10 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         assertEquals(1, getOwnerRequests1.size());
         assertEquals(1, getOwnerRequests2.size());
 
-        // In 5 secs, the getOwnerAsync requests(lookup requests) should time out.
-        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+        // In 10 secs, the getOwnerAsync requests(lookup requests) should time out.
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
                 .untilAsserted(() -> assertTrue(owner1.isCompletedExceptionally()));
-        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
                 .untilAsserted(() -> assertTrue(owner2.isCompletedExceptionally()));
 
         assertEquals(0, getOwnerRequests1.size());
@@ -1139,10 +1139,10 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         assertFalse(owner1.isDone());
         assertFalse(owner2.isDone());
 
-        // In 5 secs, the getOwnerAsync requests(lookup requests) should time out.
-        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+        // In 10 secs, the getOwnerAsync requests(lookup requests) should time out.
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
                 .untilAsserted(() -> assertTrue(owner1.isCompletedExceptionally()));
-        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
                 .untilAsserted(() -> assertTrue(owner2.isCompletedExceptionally()));
 
         // recovered, check the monitor update state : Assigned -> Owned