You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by kw...@apache.org on 2023/03/14 01:39:16 UTC

[pulsar] branch master updated: [improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl (#19622)

This is an automated email from the ASF dual-hosted git repository.

kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a85dea53a6 [improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl (#19622)
9a85dea53a6 is described below

commit 9a85dea53a6d0896cc4ba1ceb1ab11a47d5d65da
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Mon Mar 13 18:39:07 2023 -0700

    [improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl (#19622)
    
    Master Issue: https://github.com/apache/pulsar/issues/16691
    
    ### Motivation
    
    We will start raising PRs to implement PIP-192, https://github.com/apache/pulsar/issues/16691
    
    ### Modifications
    
    This PR implemented
    - SplitScheduler
    - DefaultNamespaceBundleSplitStrategyImpl
    - SplitManager
    -  and their unit test.
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  24 ++
 .../extensions/ExtensibleLoadManagerImpl.java      |  22 +-
 .../extensions/manager/SplitManager.java           | 119 +++++++++
 .../extensions/models/SplitCounter.java            |  46 ++--
 .../extensions/models/SplitDecision.java           |   9 -
 .../extensions/scheduler/SplitScheduler.java       | 177 +++++++++++++
 .../DefaultNamespaceBundleSplitStrategyImpl.java   | 171 +++++++++++++
 .../NamespaceBundleSplitStrategy.java              |   7 +-
 .../extensions/ExtensibleLoadManagerImplTest.java  |  31 +--
 .../extensions/manager/SplitManagerTest.java       | 222 ++++++++++++++++
 .../extensions/scheduler/SplitSchedulerTest.java   | 158 ++++++++++++
 .../DefaultNamespaceBundleSplitStrategyTest.java   | 284 +++++++++++++++++++++
 12 files changed, 1208 insertions(+), 62 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index ec5b0d4042b..3c00e905ac7 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2557,6 +2557,30 @@ public class ServiceConfiguration implements PulsarConfiguration {
                     + "(only used in load balancer extension logics)"
     )
     private double loadBalancerBundleLoadReportPercentage = 10;
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "Service units'(bundles) split interval. Broker periodically checks whether "
+                    + "some service units(e.g. bundles) should split if they become hot-spots. "
+                    + "(only used in load balancer extension logics)"
+    )
+    private int loadBalancerSplitIntervalMinutes = 1;
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            dynamic = true,
+            doc = "Max number of bundles to split to per cycle. "
+                    + "(only used in load balancer extension logics)"
+    )
+    private int loadBalancerMaxNumberOfBundlesToSplitPerCycle = 10;
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            dynamic = true,
+            doc = "Threshold to the consecutive count of fulfilled split conditions. "
+                    + "If the split scheduler consecutively finds bundles that meet split conditions "
+                    + "many times bigger than this threshold, the scheduler will trigger splits on the bundles "
+                    + "(if the number of bundles is less than loadBalancerNamespaceMaximumBundles). "
+                    + "(only used in load balancer extension logics)"
+    )
+    private int loadBalancerNamespaceBundleSplitConditionThreshold = 5;
 
     @FieldContext(
             category = CATEGORY_LOAD_BALANCER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 82790f44fcb..c8ac8e46845 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -45,16 +45,17 @@ import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
 import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerIsolationPoliciesFilter;
 import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerMaxTopicCountFilter;
 import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter;
+import org.apache.pulsar.broker.loadbalance.extensions.manager.SplitManager;
 import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
 import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
 import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
-import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
 import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
 import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
 import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
 import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter;
 import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler;
+import org.apache.pulsar.broker.loadbalance.extensions.scheduler.SplitScheduler;
 import org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler;
 import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
 import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
@@ -103,7 +104,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
 
     @Getter
     private final List<BrokerFilter> brokerFilterPipeline;
-
     /**
      * The load data reporter.
      */
@@ -113,9 +113,12 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
 
     private ScheduledFuture brokerLoadDataReportTask;
     private ScheduledFuture topBundlesLoadDataReportTask;
+    private SplitScheduler splitScheduler;
 
     private UnloadManager unloadManager;
 
+    private SplitManager splitManager;
+
     private boolean started = false;
 
     private final AssignCounter assignCounter = new AssignCounter();
@@ -166,7 +169,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
         this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
         this.brokerRegistry.start();
         this.unloadManager = new UnloadManager();
+        this.splitManager = new SplitManager(splitCounter);
         this.serviceUnitStateChannel.listen(unloadManager);
+        this.serviceUnitStateChannel.listen(splitManager);
         this.serviceUnitStateChannel.start();
 
         try {
@@ -184,7 +189,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
                 .brokerLoadDataStore(brokerLoadDataStore)
                 .topBundleLoadDataStore(topBundlesLoadDataStore).build();
 
-
         this.brokerLoadDataReporter =
                 new BrokerLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), brokerLoadDataStore);
 
@@ -216,10 +220,12 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
                         interval,
                         interval, TimeUnit.MILLISECONDS);
 
-        // TODO: Start bundle split scheduler.
         this.unloadScheduler = new UnloadScheduler(
                 pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel);
         this.unloadScheduler.start();
+        this.splitScheduler = new SplitScheduler(
+                pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
+        this.splitScheduler.start();
         this.started = true;
     }
 
@@ -380,6 +386,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
             this.brokerLoadDataStore.close();
             this.topBundlesLoadDataStore.close();
             this.unloadScheduler.close();
+            this.splitScheduler.close();
         } catch (IOException ex) {
             throw new PulsarServerException(ex);
         } finally {
@@ -411,13 +418,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
         this.unloadMetrics.set(unloadCounter.toMetrics(pulsar.getAdvertisedAddress()));
     }
 
-    private void updateSplitMetrics(List<SplitDecision> decisions) {
-        for (var decision : decisions) {
-            splitCounter.update(decision);
-        }
-        this.splitMetrics.set(splitCounter.toMetrics(pulsar.getAdvertisedAddress()));
-    }
-
     public List<Metrics> getMetrics() {
         List<Metrics> metricsCollection = new ArrayList<>();
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java
new file mode 100644
index 00000000000..71ebbc92a87
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
+
+/**
+ * Split manager.
+ */
+@Slf4j
+public class SplitManager implements StateChangeListener {
+
+
+    private final Map<String, CompletableFuture<Void>> inFlightSplitRequests;
+
+    private final SplitCounter counter;
+
+    public SplitManager(SplitCounter splitCounter) {
+        this.inFlightSplitRequests = new ConcurrentHashMap<>();
+        this.counter = splitCounter;
+    }
+
+    private void complete(String serviceUnit, Throwable ex) {
+        inFlightSplitRequests.computeIfPresent(serviceUnit, (__, future) -> {
+            if (!future.isDone()) {
+                if (ex != null) {
+                    future.completeExceptionally(ex);
+                } else {
+                    future.complete(null);
+                }
+            }
+            return null;
+        });
+    }
+
+    public CompletableFuture<Void> waitAsync(CompletableFuture<Void> eventPubFuture,
+                                             String bundle,
+                                             SplitDecision decision,
+                                             long timeout,
+                                             TimeUnit timeoutUnit) {
+        return eventPubFuture
+                .thenCompose(__ -> inFlightSplitRequests.computeIfAbsent(bundle, ignore -> {
+                    log.info("Published the bundle split event for bundle:{}. "
+                                    + "Waiting the split event to complete. Timeout: {} {}",
+                            bundle, timeout, timeoutUnit);
+                    CompletableFuture<Void> future = new CompletableFuture<>();
+                    future.orTimeout(timeout, timeoutUnit).whenComplete((v, ex) -> {
+                        if (ex != null) {
+                            inFlightSplitRequests.remove(bundle);
+                            log.warn("Timed out while waiting for the bundle split event: {}", bundle, ex);
+                        }
+                    });
+                    return future;
+                }))
+                .whenComplete((__, ex) -> {
+                    if (ex != null) {
+                        log.error("Failed the bundle split event for bundle:{}", bundle, ex);
+                        counter.update(Failure, Unknown);
+                    } else {
+                        log.info("Completed the bundle split event for bundle:{}", bundle);
+                        counter.update(decision);
+                    }
+                });
+    }
+
+    @Override
+    public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
+        ServiceUnitState state = ServiceUnitStateData.state(data);
+        if (t != null && inFlightSplitRequests.containsKey(serviceUnit)) {
+            this.complete(serviceUnit, t);
+            return;
+        }
+        switch (state) {
+            case Deleted, Owned, Init -> this.complete(serviceUnit, t);
+            default -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("Handling {} for service unit {}", data, serviceUnit);
+                }
+            }
+        }
+    }
+
+    public void close() {
+        inFlightSplitRequests.forEach((bundle, future) -> {
+            if (!future.isDone()) {
+                String msg = String.format("Splitting bundle: %s, but the manager already closed.", bundle);
+                log.warn(msg);
+                future.completeExceptionally(new IllegalStateException(msg));
+            }
+        });
+        inFlightSplitRequests.clear();
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitCounter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitCounter.java
index 99406412cee..ed72b5f5863 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitCounter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitCounter.java
@@ -19,10 +19,8 @@
 package org.apache.pulsar.broker.loadbalance.extensions.models;
 
 import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure;
-import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Skip;
 import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
 import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
-import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Balanced;
 import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Bandwidth;
 import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.MsgRate;
 import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Sessions;
@@ -32,7 +30,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.commons.lang3.mutable.MutableLong;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.pulsar.common.stats.Metrics;
 
 /**
@@ -40,23 +38,20 @@ import org.apache.pulsar.common.stats.Metrics;
  */
 public class SplitCounter {
 
-    long splitCount = 0;
-
-    final Map<SplitDecision.Label, Map<SplitDecision.Reason, MutableLong>> breakdownCounters;
+    private long splitCount = 0;
+    private final Map<SplitDecision.Label, Map<SplitDecision.Reason, AtomicLong>> breakdownCounters;
+    private volatile long updatedAt = 0;
 
     public SplitCounter() {
         breakdownCounters = Map.of(
                 Success, Map.of(
-                        Topics, new MutableLong(),
-                        Sessions, new MutableLong(),
-                        MsgRate, new MutableLong(),
-                        Bandwidth, new MutableLong(),
-                        Admin, new MutableLong()),
-                Skip, Map.of(
-                        Balanced, new MutableLong()
-                        ),
+                        Topics, new AtomicLong(),
+                        Sessions, new AtomicLong(),
+                        MsgRate, new AtomicLong(),
+                        Bandwidth, new AtomicLong(),
+                        Admin, new AtomicLong()),
                 Failure, Map.of(
-                        Unknown, new MutableLong())
+                        Unknown, new AtomicLong())
         );
     }
 
@@ -64,7 +59,16 @@ public class SplitCounter {
         if (decision.label == Success) {
             splitCount++;
         }
-        breakdownCounters.get(decision.getLabel()).get(decision.getReason()).increment();
+        breakdownCounters.get(decision.getLabel()).get(decision.getReason()).incrementAndGet();
+        updatedAt = System.currentTimeMillis();
+    }
+
+    public void update(SplitDecision.Label label, SplitDecision.Reason reason) {
+        if (label == Success) {
+            splitCount++;
+        }
+        breakdownCounters.get(label).get(reason).incrementAndGet();
+        updatedAt = System.currentTimeMillis();
     }
 
     public List<Metrics> toMetrics(String advertisedBrokerAddress) {
@@ -77,17 +81,18 @@ public class SplitCounter {
         m.put("brk_lb_bundles_split_total", splitCount);
         metrics.add(m);
 
-        for (Map.Entry<SplitDecision.Label, Map<SplitDecision.Reason, MutableLong>> etr
+
+        for (Map.Entry<SplitDecision.Label, Map<SplitDecision.Reason, AtomicLong>> etr
                 : breakdownCounters.entrySet()) {
             var result = etr.getKey();
-            for (Map.Entry<SplitDecision.Reason, MutableLong> counter : etr.getValue().entrySet()) {
+            for (Map.Entry<SplitDecision.Reason, AtomicLong> counter : etr.getValue().entrySet()) {
                 var reason = counter.getKey();
                 var count = counter.getValue();
                 Map<String, String> breakdownDims = new HashMap<>(dimensions);
                 breakdownDims.put("result", result.toString());
                 breakdownDims.put("reason", reason.toString());
                 Metrics breakdownMetric = Metrics.create(breakdownDims);
-                breakdownMetric.put("brk_lb_bundles_split_breakdown_total", count);
+                breakdownMetric.put("brk_lb_bundles_split_breakdown_total", count.get());
                 metrics.add(breakdownMetric);
             }
         }
@@ -95,4 +100,7 @@ public class SplitCounter {
         return metrics;
     }
 
+    public long updatedAt() {
+        return updatedAt;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitDecision.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitDecision.java
index a3dede50c1c..433d21a5a61 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitDecision.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitDecision.java
@@ -19,9 +19,7 @@
 package org.apache.pulsar.broker.loadbalance.extensions.models;
 
 import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure;
-import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Skip;
 import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
-import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Balanced;
 import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
 import lombok.Data;
 
@@ -36,7 +34,6 @@ public class SplitDecision {
 
     public enum Label {
         Success,
-        Skip,
         Failure
     }
 
@@ -46,7 +43,6 @@ public class SplitDecision {
         MsgRate,
         Bandwidth,
         Admin,
-        Balanced,
         Unknown
     }
 
@@ -62,11 +58,6 @@ public class SplitDecision {
         reason = null;
     }
 
-    public void skip() {
-        label = Skip;
-        reason = Balanced;
-    }
-
     public void succeed(Reason reason) {
         label = Success;
         this.reason = reason;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java
new file mode 100644
index 00000000000..589df80fc5c
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import org.apache.pulsar.broker.loadbalance.extensions.manager.SplitManager;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
+import org.apache.pulsar.broker.loadbalance.extensions.strategy.DefaultNamespaceBundleSplitStrategyImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.strategy.NamespaceBundleSplitStrategy;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.FutureUtil;
+
+/**
+ * Service Unit(e.g. bundles) Split scheduler.
+ */
+@Slf4j
+public class SplitScheduler implements LoadManagerScheduler {
+
+    private final PulsarService pulsar;
+
+    private final ScheduledExecutorService loadManagerExecutor;
+
+    private final LoadManagerContext context;
+
+    private final ServiceConfiguration conf;
+
+    private final ServiceUnitStateChannel serviceUnitStateChannel;
+
+    private final NamespaceBundleSplitStrategy bundleSplitStrategy;
+
+    private final SplitCounter counter;
+
+    private final SplitManager splitManager;
+
+    private final AtomicReference<List<Metrics>> splitMetrics;
+
+    private volatile ScheduledFuture<?> task;
+
+    private long counterLastUpdatedAt = 0;
+
+    public SplitScheduler(PulsarService pulsar,
+                          ServiceUnitStateChannel serviceUnitStateChannel,
+                          SplitManager splitManager,
+                          SplitCounter counter,
+                          AtomicReference<List<Metrics>> splitMetrics,
+                          LoadManagerContext context,
+                          NamespaceBundleSplitStrategy bundleSplitStrategy) {
+        this.pulsar = pulsar;
+        this.loadManagerExecutor = pulsar.getLoadManagerExecutor();
+        this.splitManager = splitManager;
+        this.counter = counter;
+        this.splitMetrics = splitMetrics;
+        this.context = context;
+        this.conf = pulsar.getConfiguration();
+        this.bundleSplitStrategy = bundleSplitStrategy;
+        this.serviceUnitStateChannel = serviceUnitStateChannel;
+    }
+
+    public SplitScheduler(PulsarService pulsar,
+                          ServiceUnitStateChannel serviceUnitStateChannel,
+                          SplitManager splitManager,
+                          SplitCounter counter,
+                          AtomicReference<List<Metrics>> splitMetrics,
+                          LoadManagerContext context) {
+        this(pulsar, serviceUnitStateChannel, splitManager, counter, splitMetrics, context,
+                new DefaultNamespaceBundleSplitStrategyImpl(counter));
+    }
+
+    @Override
+    public void execute() {
+        boolean debugMode = conf.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
+        if (debugMode) {
+            log.info("Load balancer enabled: {}, Split enabled: {}.",
+                    conf.isLoadBalancerEnabled(), conf.isLoadBalancerAutoBundleSplitEnabled());
+        }
+
+        if (!isLoadBalancerAutoBundleSplitEnabled()) {
+            if (debugMode) {
+                log.info("The load balancer or load balancer split already disabled. Skipping.");
+            }
+            return;
+        }
+
+        synchronized (bundleSplitStrategy) {
+            final Set<SplitDecision> decisions = bundleSplitStrategy.findBundlesToSplit(context, pulsar);
+            if (!decisions.isEmpty()) {
+
+                // currently following the unloading timeout
+                var asyncOpTimeoutMs = conf.getNamespaceBundleUnloadingTimeoutMs();
+                List<CompletableFuture<Void>> futures = new ArrayList<>();
+                for (SplitDecision decision : decisions) {
+                    if (decision.getLabel() == Success) {
+                        var split = decision.getSplit();
+                        futures.add(
+                                splitManager.waitAsync(
+                                        serviceUnitStateChannel.publishSplitEventAsync(split),
+                                        split.serviceUnit(),
+                                        decision,
+                                        asyncOpTimeoutMs, TimeUnit.MILLISECONDS)
+                        );
+                    }
+                }
+                try {
+                    FutureUtil.waitForAll(futures)
+                            .get(asyncOpTimeoutMs, TimeUnit.MILLISECONDS);
+                } catch (Throwable e) {
+                    log.error("Failed to wait for split events to persist.", e);
+                }
+            } else {
+                if (debugMode) {
+                    log.info("BundleSplitStrategy returned no bundles to split.");
+                }
+            }
+        }
+
+        if (counter.updatedAt() > counterLastUpdatedAt) {
+            splitMetrics.set(counter.toMetrics(pulsar.getAdvertisedAddress()));
+            counterLastUpdatedAt = counter.updatedAt();
+        }
+    }
+
+    @Override
+    public void start() {
+        long interval = TimeUnit.MINUTES
+                .toMillis(conf.getLoadBalancerSplitIntervalMinutes());
+        task = loadManagerExecutor.scheduleAtFixedRate(() -> {
+            try {
+                execute();
+            } catch (Throwable e) {
+                log.error("Failed to run the split job.", e);
+            }
+        }, interval, interval, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() {
+        if (task != null) {
+            task.cancel(false);
+            task = null;
+        }
+    }
+
+    private boolean isLoadBalancerAutoBundleSplitEnabled() {
+        return conf.isLoadBalancerEnabled() && conf.isLoadBalancerAutoBundleSplitEnabled();
+    }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
new file mode 100644
index 00000000000..e572fd4161b
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.strategy;
+
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Bandwidth;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.MsgRate;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Sessions;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Topics;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+
+/**
+ * Determines which bundles should be split based on various thresholds.
+ *
+ * Migrate from {@link org.apache.pulsar.broker.loadbalance.impl.BundleSplitterTask}
+ */
+@Slf4j
+public class DefaultNamespaceBundleSplitStrategyImpl implements NamespaceBundleSplitStrategy {
+    private final Set<SplitDecision> decisionCache;
+    private final Map<String, Integer> namespaceBundleCount;
+    private final Map<String, Integer> bundleHighTrafficFrequency;
+    private final SplitCounter counter;
+
+    public DefaultNamespaceBundleSplitStrategyImpl(SplitCounter counter) {
+        decisionCache = new HashSet<>();
+        namespaceBundleCount = new HashMap<>();
+        bundleHighTrafficFrequency = new HashMap<>();
+        this.counter = counter;
+
+    }
+
+    @Override
+    public Set<SplitDecision> findBundlesToSplit(LoadManagerContext context, PulsarService pulsar) {
+        decisionCache.clear();
+        namespaceBundleCount.clear();
+        final ServiceConfiguration conf = pulsar.getConfiguration();
+        int maxBundleCount = conf.getLoadBalancerNamespaceMaximumBundles();
+        long maxBundleTopics = conf.getLoadBalancerNamespaceBundleMaxTopics();
+        long maxBundleSessions = conf.getLoadBalancerNamespaceBundleMaxSessions();
+        long maxBundleMsgRate = conf.getLoadBalancerNamespaceBundleMaxMsgRate();
+        long maxBundleBandwidth = conf.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * LoadManagerShared.MIBI;
+        long maxSplitCount = conf.getLoadBalancerMaxNumberOfBundlesToSplitPerCycle();
+        long splitConditionThreshold = conf.getLoadBalancerNamespaceBundleSplitConditionThreshold();
+        boolean debug = log.isDebugEnabled() || conf.isLoadBalancerDebugModeEnabled();
+
+        Map<String, NamespaceBundleStats> bundleStatsMap = pulsar.getBrokerService().getBundleStats();
+        NamespaceBundleFactory namespaceBundleFactory =
+                pulsar.getNamespaceService().getNamespaceBundleFactory();
+
+        // clean bundleHighTrafficFrequency
+        bundleHighTrafficFrequency.keySet().retainAll(bundleStatsMap.keySet());
+
+        for (var entry : bundleStatsMap.entrySet()) {
+            final String bundle = entry.getKey();
+            final NamespaceBundleStats stats = entry.getValue();
+            if (stats.topics < 2) {
+                if (debug) {
+                    log.info("The count of topics on the bundle {} is less than 2, skip split!", bundle);
+                }
+                continue;
+            }
+
+            final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+            final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
+            if (!namespaceBundleFactory
+                    .canSplitBundle(namespaceBundleFactory.getBundle(namespaceName, bundleRange))) {
+                if (debug) {
+                    log.info("Can't split the bundle:{}. invalid bundle range:{}. ", bundle, bundleRange);
+                }
+                counter.update(Failure, Unknown);
+                continue;
+            }
+
+            double totalMessageRate = stats.msgRateIn + stats.msgRateOut;
+            double totalMessageThroughput = stats.msgThroughputIn + stats.msgThroughputOut;
+            int totalSessionCount = stats.consumerCount + stats.producerCount;
+            SplitDecision.Reason reason = Unknown;
+            if (stats.topics > maxBundleTopics) {
+                reason = Topics;
+            } else if (maxBundleSessions > 0 && (totalSessionCount > maxBundleSessions)) {
+                reason = Sessions;
+            } else if (totalMessageRate > maxBundleMsgRate) {
+                reason = MsgRate;
+            } else if (totalMessageThroughput > maxBundleBandwidth) {
+                reason = Bandwidth;
+            }
+
+            if (reason != Unknown) {
+                bundleHighTrafficFrequency.put(bundle, bundleHighTrafficFrequency.getOrDefault(bundle, 0) + 1);
+            } else {
+                bundleHighTrafficFrequency.remove(bundle);
+            }
+
+            if (bundleHighTrafficFrequency.getOrDefault(bundle, 0) > splitConditionThreshold) {
+                final String namespace = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+                try {
+                    final int bundleCount = pulsar.getNamespaceService()
+                            .getBundleCount(NamespaceName.get(namespace));
+                    if ((bundleCount + namespaceBundleCount.getOrDefault(namespace, 0))
+                            < maxBundleCount) {
+                        if (debug) {
+                            log.info("The bundle {} is considered to split. Topics: {}/{}, Sessions: ({}+{})/{}, "
+                                            + "Message Rate: {}/{} (msgs/s), Message Throughput: {}/{} (MB/s)",
+                                    bundle, stats.topics, maxBundleTopics, stats.producerCount, stats.consumerCount,
+                                    maxBundleSessions, totalMessageRate, maxBundleMsgRate,
+                                    totalMessageThroughput / LoadManagerShared.MIBI,
+                                    maxBundleBandwidth / LoadManagerShared.MIBI);
+                        }
+                        var decision = new SplitDecision();
+                        decision.setSplit(new Split(bundle, context.brokerRegistry().getBrokerId(), new HashMap<>()));
+                        decision.succeed(reason);
+                        decisionCache.add(decision);
+                        int bundleNum = namespaceBundleCount.getOrDefault(namespace, 0);
+                        namespaceBundleCount.put(namespace, bundleNum + 1);
+                        bundleHighTrafficFrequency.remove(bundle);
+                        // Clear namespace bundle-cache
+                        namespaceBundleFactory.invalidateBundleCache(NamespaceName.get(namespaceName));
+                        if (decisionCache.size() == maxSplitCount) {
+                            if (debug) {
+                                log.info("Too many bundles to split in this split cycle {} / {}. Stop.",
+                                        decisionCache.size(), maxSplitCount);
+                            }
+                            break;
+                        }
+                    } else {
+                        if (debug) {
+                            log.info(
+                                    "Could not split namespace bundle {} because namespace {} has too many bundles:"
+                                            + "{}", bundle, namespace, bundleCount);
+                        }
+                    }
+                } catch (Exception e) {
+                    counter.update(Failure, Unknown);
+                    log.warn("Error while computing bundle splits for namespace {}", namespace, e);
+                }
+            }
+        }
+        return decisionCache;
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceBundleSplitStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/NamespaceBundleSplitStrategy.java
similarity index 86%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceBundleSplitStrategy.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/NamespaceBundleSplitStrategy.java
index 88bd7f0b087..14023f1b5b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceBundleSplitStrategy.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/NamespaceBundleSplitStrategy.java
@@ -16,11 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+package org.apache.pulsar.broker.loadbalance.extensions.strategy;
 
 import java.util.Set;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
-import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
 
 /**
  * Determines which bundles should be split based on various thresholds.
@@ -35,5 +36,5 @@ public interface NamespaceBundleSplitStrategy {
      * @param context The context used for decisions.
      * @return A set of the bundles that should be split.
      */
-    Set<Split> findBundlesToSplit(LoadManagerContext context);
+    Set<SplitDecision> findBundlesToSplit(LoadManagerContext context, PulsarService pulsar);
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 441415a9d35..17fcb995261 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -464,24 +464,16 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
                     FieldUtils.readDeclaredField(primaryLoadManager, "splitMetrics", true);
             SplitCounter splitCounter = new SplitCounter();
             FieldUtils.writeDeclaredField(splitCounter, "splitCount", 35l, true);
-            FieldUtils.writeDeclaredField(splitCounter, "breakdownCounters", new LinkedHashMap<>() {
-                {
-                    put(SplitDecision.Label.Success, new LinkedHashMap<>() {
-                        {
-                            put(Topics, new MutableLong(1));
-                            put(Sessions, new MutableLong(2));
-                            put(MsgRate, new MutableLong(3));
-                            put(Bandwidth, new MutableLong(4));
-                            put(Admin, new MutableLong(5));
-                        }
-                    });
-                    put(SplitDecision.Label.Skip, Map.of(
-                            SplitDecision.Reason.Balanced, new MutableLong(6)
-                    ));
-                    put(SplitDecision.Label.Failure, Map.of(
-                            SplitDecision.Reason.Unknown, new MutableLong(7)));
-                }
-            }, true);
+            FieldUtils.writeDeclaredField(splitCounter, "breakdownCounters", Map.of(
+                    SplitDecision.Label.Success, Map.of(
+                            Topics, new AtomicLong(1),
+                            Sessions, new AtomicLong(2),
+                            MsgRate, new AtomicLong(3),
+                            Bandwidth, new AtomicLong(4),
+                            Admin, new AtomicLong(5)),
+                    SplitDecision.Label.Failure, Map.of(
+                            SplitDecision.Reason.Unknown, new AtomicLong(6))
+            ), true);
             splitMetrics.set(splitCounter.toMetrics(pulsar.getAdvertisedAddress()));
         }
 
@@ -556,8 +548,7 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
                         dimensions=[{broker=localhost, metric=bundlesSplit, reason=MsgRate, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=3}]
                         dimensions=[{broker=localhost, metric=bundlesSplit, reason=Bandwidth, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=4}]
                         dimensions=[{broker=localhost, metric=bundlesSplit, reason=Admin, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=5}]
-                        dimensions=[{broker=localhost, metric=bundlesSplit, reason=Balanced, result=Skip}], metrics=[{brk_lb_bundles_split_breakdown_total=6}]
-                        dimensions=[{broker=localhost, metric=bundlesSplit, reason=Unknown, result=Failure}], metrics=[{brk_lb_bundles_split_breakdown_total=7}]
+                        dimensions=[{broker=localhost, metric=bundlesSplit, reason=Unknown, result=Failure}], metrics=[{brk_lb_bundles_split_breakdown_total=6}]
                         dimensions=[{broker=localhost, metric=assign, result=Empty}], metrics=[{brk_lb_assign_broker_breakdown_total=2}]
                         dimensions=[{broker=localhost, metric=assign, result=Skip}], metrics=[{brk_lb_assign_broker_breakdown_total=3}]
                         dimensions=[{broker=localhost, metric=assign, result=Success}], metrics=[{brk_lb_assign_broker_breakdown_total=1}]
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java
new file mode 100644
index 00000000000..3287306ab48
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.VERSION_ID_INIT;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Sessions;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class SplitManagerTest {
+    
+    String bundle = "bundle-1";
+
+    String dstBroker = "broker-1";
+
+    @Test
+    public void testEventPubFutureHasException() {
+        var counter = new SplitCounter();
+        SplitManager manager = new SplitManager(counter);
+        var decision = new SplitDecision();
+        CompletableFuture<Void> future =
+                manager.waitAsync(FutureUtil.failedFuture(new Exception("test")),
+                        bundle, decision, 10, TimeUnit.SECONDS);
+
+        assertTrue(future.isCompletedExceptionally());
+        try {
+            future.get();
+            fail();
+        } catch (Exception ex) {
+            assertEquals(ex.getCause().getMessage(), "test");
+        }
+        var counterExpected = new SplitCounter();
+        counterExpected.update(SplitDecision.Label.Failure, Unknown);
+        assertEquals(counter.toMetrics(null).toString(),
+                counterExpected.toMetrics(null).toString());
+    }
+
+    @Test
+    public void testTimeout() throws IllegalAccessException {
+        var counter = new SplitCounter();
+        SplitManager manager = new SplitManager(counter);
+        var decision = new SplitDecision();
+        CompletableFuture<Void> future =
+                manager.waitAsync(CompletableFuture.completedFuture(null),
+                        bundle, decision, 3, TimeUnit.SECONDS);
+        var inFlightUnloadRequests = getinFlightUnloadRequests(manager);
+
+        assertEquals(inFlightUnloadRequests.size(), 1);
+
+        try {
+            future.get();
+            fail();
+        } catch (Exception ex) {
+            assertTrue(ex.getCause() instanceof TimeoutException);
+        }
+
+        assertEquals(inFlightUnloadRequests.size(), 0);
+        var counterExpected = new SplitCounter();
+        counterExpected.update(SplitDecision.Label.Failure, Unknown);
+        assertEquals(counter.toMetrics(null).toString(),
+                counterExpected.toMetrics(null).toString());
+    }
+
+    @Test
+    public void testSuccess() throws IllegalAccessException, ExecutionException, InterruptedException {
+        var counter = new SplitCounter();
+        SplitManager manager = new SplitManager(counter);
+        var counterExpected = new SplitCounter();
+        var decision = new SplitDecision();
+        decision.succeed(Sessions);
+        CompletableFuture<Void> future =
+                manager.waitAsync(CompletableFuture.completedFuture(null),
+                        bundle, decision, 5, TimeUnit.SECONDS);
+        var inFlightUnloadRequests = getinFlightUnloadRequests(manager);
+
+        assertEquals(inFlightUnloadRequests.size(), 1);
+
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Assigning, dstBroker, VERSION_ID_INIT), null);
+        assertEquals(inFlightUnloadRequests.size(), 1);
+
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Splitting, dstBroker, VERSION_ID_INIT), null);
+        assertEquals(inFlightUnloadRequests.size(), 1);
+
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Releasing, dstBroker, VERSION_ID_INIT), null);
+        assertEquals(inFlightUnloadRequests.size(), 1);
+
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, VERSION_ID_INIT), null);
+        assertEquals(inFlightUnloadRequests.size(), 1);
+        assertEquals(counter.toMetrics(null).toString(),
+                counterExpected.toMetrics(null).toString());
+
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Deleted, dstBroker, VERSION_ID_INIT), null);
+        counterExpected.update(SplitDecision.Label.Success, Sessions);
+        assertEquals(inFlightUnloadRequests.size(), 0);
+        assertEquals(counter.toMetrics(null).toString(),
+                counterExpected.toMetrics(null).toString());
+
+        // Success with Init state.
+        future = manager.waitAsync(CompletableFuture.completedFuture(null),
+                bundle, decision, 5, TimeUnit.SECONDS);
+        inFlightUnloadRequests = getinFlightUnloadRequests(manager);
+        assertEquals(inFlightUnloadRequests.size(), 1);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Init, dstBroker, VERSION_ID_INIT), null);
+        assertEquals(inFlightUnloadRequests.size(), 0);
+        counterExpected.update(SplitDecision.Label.Success, Sessions);
+        assertEquals(counter.toMetrics(null).toString(),
+                counterExpected.toMetrics(null).toString());
+        future.get();
+
+        // Success with Owned state.
+        future = manager.waitAsync(CompletableFuture.completedFuture(null),
+                bundle, decision, 5, TimeUnit.SECONDS);
+        inFlightUnloadRequests = getinFlightUnloadRequests(manager);
+        assertEquals(inFlightUnloadRequests.size(), 1);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), null);
+        assertEquals(inFlightUnloadRequests.size(), 0);
+        counterExpected.update(SplitDecision.Label.Success, Sessions);
+        assertEquals(counter.toMetrics(null).toString(),
+                counterExpected.toMetrics(null).toString());
+        future.get();
+    }
+
+    @Test
+    public void testFailedStage() throws IllegalAccessException {
+        var counter = new SplitCounter();
+        SplitManager manager = new SplitManager(counter);
+        var decision = new SplitDecision();
+        CompletableFuture<Void> future =
+                manager.waitAsync(CompletableFuture.completedFuture(null),
+                        bundle, decision, 5, TimeUnit.SECONDS);
+        var inFlightUnloadRequests = getinFlightUnloadRequests(manager);
+
+        assertEquals(inFlightUnloadRequests.size(), 1);
+
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT),
+                new IllegalStateException("Failed stage."));
+
+        try {
+            future.get();
+            fail();
+        } catch (Exception ex) {
+            assertTrue(ex.getCause() instanceof IllegalStateException);
+            assertEquals(ex.getCause().getMessage(), "Failed stage.");
+        }
+
+        assertEquals(inFlightUnloadRequests.size(), 0);
+        var counterExpected = new SplitCounter();
+        counterExpected.update(SplitDecision.Label.Failure, Unknown);
+        assertEquals(counter.toMetrics(null).toString(),
+                counterExpected.toMetrics(null).toString());
+    }
+
+    @Test
+    public void testClose() throws IllegalAccessException {
+        SplitManager manager = new SplitManager(new SplitCounter());
+        var decision = new SplitDecision();
+        CompletableFuture<Void> future =
+                manager.waitAsync(CompletableFuture.completedFuture(null),
+                        bundle, decision, 5, TimeUnit.SECONDS);
+        var inFlightUnloadRequests = getinFlightUnloadRequests(manager);
+        assertEquals(inFlightUnloadRequests.size(), 1);
+        manager.close();
+        assertEquals(inFlightUnloadRequests.size(), 0);
+
+        try {
+            future.get();
+            fail();
+        } catch (Exception ex) {
+            assertTrue(ex.getCause() instanceof IllegalStateException);
+        }
+    }
+
+    private Map<String, CompletableFuture<Void>> getinFlightUnloadRequests(SplitManager manager)
+            throws IllegalAccessException {
+        var inFlightUnloadRequest =
+                (Map<String, CompletableFuture<Void>>) FieldUtils.readField(manager, "inFlightSplitRequests", true);
+
+        return inFlightUnloadRequest;
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java
new file mode 100644
index 00000000000..7988aa41336
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import org.apache.pulsar.broker.loadbalance.extensions.manager.SplitManager;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
+import org.apache.pulsar.broker.loadbalance.extensions.strategy.NamespaceBundleSplitStrategy;
+import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.stats.Metrics;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class SplitSchedulerTest {
+
+    PulsarService pulsar;
+    ServiceConfiguration config;
+    NamespaceBundleFactory namespaceBundleFactory;
+    LoadManagerContext context;
+    ServiceUnitStateChannel channel;
+    NamespaceBundleSplitStrategy strategy;
+    String bundle1 = "tenant/namespace/0x00000000_0xFFFFFFFF";
+    String bundle2 = "tenant/namespace/0x00000000_0x0FFFFFFF";
+    String broker = "broker-1";
+    SplitDecision decision1;
+    SplitDecision decision2;
+
+    @BeforeMethod
+    public void setUp() {
+
+        config = new ServiceConfiguration();
+        config.setLoadBalancerDebugModeEnabled(true);
+
+        pulsar = mock(PulsarService.class);
+        namespaceBundleFactory = mock(NamespaceBundleFactory.class);
+        context = mock(LoadManagerContext.class);
+        channel = mock(ServiceUnitStateChannel.class);
+        strategy = mock(NamespaceBundleSplitStrategy.class);
+
+        doReturn(config).when(pulsar).getConfiguration();
+        doReturn(true).when(namespaceBundleFactory).canSplitBundle(any());
+        doReturn(CompletableFuture.completedFuture(null)).when(channel).publishSplitEventAsync(any());
+
+        decision1 = new SplitDecision();
+        decision1.setSplit(new Split(bundle1, broker, new HashMap<>()));
+        decision1.succeed(SplitDecision.Reason.MsgRate);
+
+        decision2 = new SplitDecision();
+        decision2.setSplit(new Split(bundle2, broker, new HashMap<>()));
+        decision2.succeed(SplitDecision.Reason.Sessions);
+        Set<SplitDecision> decisions = Set.of(decision1, decision2);
+        doReturn(decisions).when(strategy).findBundlesToSplit(any(), any());
+    }
+
+    @Test(timeOut = 30 * 1000)
+    public void testExecuteSuccess() {
+        AtomicReference<List<Metrics>> reference = new AtomicReference();
+        SplitCounter counter = new SplitCounter();
+        SplitManager manager = mock(SplitManager.class);
+        SplitScheduler scheduler = new SplitScheduler(pulsar, channel, manager, counter, reference, context, strategy);
+        doAnswer((invocation)->{
+            var decision = invocation.getArgument(2, SplitDecision.class);
+            counter.update(decision);
+            return CompletableFuture.completedFuture(null);
+        }).when(manager).waitAsync(any(), any(), any(), anyLong(), any());
+        scheduler.execute();
+
+        var counterExpected = new SplitCounter();
+        counterExpected.update(decision1);
+        counterExpected.update(decision2);
+        verify(channel, times(1)).publishSplitEventAsync(eq(decision1.getSplit()));
+        verify(channel, times(1)).publishSplitEventAsync(eq(decision2.getSplit()));
+
+        assertEquals(reference.get().toString(), counterExpected.toMetrics(pulsar.getAdvertisedAddress()).toString());
+
+        // Test empty splits.
+        Set<SplitDecision> emptyUnload = Set.of();
+        doReturn(emptyUnload).when(strategy).findBundlesToSplit(any(), any());
+
+        scheduler.execute();
+        verify(channel, times(2)).publishSplitEventAsync(any());
+        assertEquals(reference.get().toString(), counterExpected.toMetrics(pulsar.getAdvertisedAddress()).toString());
+    }
+
+    @Test(timeOut = 30 * 1000)
+    public void testExecuteFailure() {
+        AtomicReference<List<Metrics>> reference = new AtomicReference();
+        SplitCounter counter = new SplitCounter();
+        SplitManager manager = new SplitManager(counter);
+        SplitScheduler scheduler = new SplitScheduler(pulsar, channel, manager, counter, reference, context, strategy);
+        doReturn(CompletableFuture.failedFuture(new RuntimeException())).when(channel).publishSplitEventAsync(any());
+
+        scheduler.execute();
+
+
+        var counterExpected = new SplitCounter();
+        counterExpected.update(SplitDecision.Label.Failure, SplitDecision.Reason.Unknown);
+        counterExpected.update(SplitDecision.Label.Failure, SplitDecision.Reason.Unknown);
+        verify(channel, times(1)).publishSplitEventAsync(eq(decision1.getSplit()));
+        verify(channel, times(1)).publishSplitEventAsync(eq(decision2.getSplit()));
+
+        assertEquals(reference.get().toString(), counterExpected.toMetrics(pulsar.getAdvertisedAddress()).toString());
+    }
+
+
+    @Test(timeOut = 30 * 1000)
+    public void testDisableLoadBalancer() {
+
+        config.setLoadBalancerEnabled(false);
+        SplitScheduler scheduler = new SplitScheduler(pulsar, channel, null, null, null, context, strategy);
+
+        scheduler.execute();
+
+        verify(strategy, times(0)).findBundlesToSplit(any(), any());
+
+        config.setLoadBalancerEnabled(true);
+        config.setLoadBalancerAutoBundleSplitEnabled(false);
+        scheduler.execute();
+
+        verify(strategy, times(0)).findBundlesToSplit(any(), any());
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
new file mode 100644
index 00000000000..71606bb85a3
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.strategy;
+
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.PulsarStats;
+import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class DefaultNamespaceBundleSplitStrategyTest {
+
+    PulsarService pulsar;
+    BrokerService brokerService;
+    PulsarStats pulsarStats;
+    Map<String, NamespaceBundleStats> bundleStats;
+    ServiceConfiguration config;
+    NamespaceBundleFactory namespaceBundleFactory;
+    NamespaceService namespaceService;
+
+    LoadManagerContext loadManagerContext;
+
+    BrokerRegistry brokerRegistry;
+
+    String bundle1 = "tenant/namespace/0x00000000_0xFFFFFFFF";
+    String bundle2 = "tenant/namespace/0x00000000_0x0FFFFFFF";
+
+    String broker = "broker-1";
+
+    @BeforeMethod
+    void setup() {
+        config = new ServiceConfiguration();
+        config.setLoadBalancerDebugModeEnabled(true);
+        config.setLoadBalancerNamespaceMaximumBundles(100);
+        config.setLoadBalancerNamespaceBundleMaxTopics(100);
+        config.setLoadBalancerNamespaceBundleMaxSessions(100);
+        config.setLoadBalancerNamespaceBundleMaxMsgRate(100);
+        config.setLoadBalancerNamespaceBundleMaxBandwidthMbytes(100);
+        config.setLoadBalancerMaxNumberOfBundlesToSplitPerCycle(1);
+        config.setLoadBalancerNamespaceBundleSplitConditionThreshold(3);
+
+        pulsar = mock(PulsarService.class);
+        brokerService = mock(BrokerService.class);
+        pulsarStats = mock(PulsarStats.class);
+        namespaceService = mock(NamespaceService.class);
+        namespaceBundleFactory = mock(NamespaceBundleFactory.class);
+        loadManagerContext = mock(LoadManagerContext.class);
+        brokerRegistry = mock(BrokerRegistry.class);
+
+
+
+        doReturn(brokerService).when(pulsar).getBrokerService();
+        doReturn(config).when(pulsar).getConfiguration();
+        doReturn(pulsarStats).when(brokerService).getPulsarStats();
+        doReturn(namespaceService).when(pulsar).getNamespaceService();
+        doReturn(namespaceBundleFactory).when(namespaceService).getNamespaceBundleFactory();
+        doReturn(true).when(namespaceBundleFactory).canSplitBundle(any());
+        doReturn(brokerRegistry).when(loadManagerContext).brokerRegistry();
+        doReturn(broker).when(brokerRegistry).getBrokerId();
+
+
+        bundleStats = new LinkedHashMap<>();
+        NamespaceBundleStats stats1 = new NamespaceBundleStats();
+        stats1.topics = 5;
+        bundleStats.put(bundle1, stats1);
+        NamespaceBundleStats stats2 = new NamespaceBundleStats();
+        stats2.topics = 5;
+        bundleStats.put(bundle2, stats2);
+        doReturn(bundleStats).when(brokerService).getBundleStats();
+    }
+
+    public void testNamespaceBundleSplitConditionThreshold() {
+        config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0);
+        bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1);
+        var strategy = new DefaultNamespaceBundleSplitStrategyImpl(new SplitCounter());
+        var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+        assertEquals(actual.size(), 1);
+    }
+
+
+    public void testNotEnoughTopics() {
+        config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0);
+        bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1);
+        var strategy = new DefaultNamespaceBundleSplitStrategyImpl(new SplitCounter());
+        bundleStats.values().forEach(v -> v.topics = 1);
+        var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+        var expected = Set.of();
+        assertEquals(actual, expected);
+    }
+
+    public void testNamespaceMaximumBundles() throws Exception {
+        config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0);
+        bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1);
+        var strategy = new DefaultNamespaceBundleSplitStrategyImpl(new SplitCounter());
+        doReturn(config.getLoadBalancerNamespaceMaximumBundles()).when(namespaceService).getBundleCount(any());
+        var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+        var expected = Set.of();
+        assertEquals(actual, expected);
+    }
+
+    public void testEmptyBundleStats() {
+        config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0);
+        bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1);
+        var strategy = new DefaultNamespaceBundleSplitStrategyImpl(new SplitCounter());
+        bundleStats.clear();
+        var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+        var expected = Set.of();
+        assertEquals(actual, expected);
+    }
+
+    public void testError() throws Exception {
+        var counter = spy(new SplitCounter());
+        config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0);
+        bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1);
+        var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter);
+        doThrow(new RuntimeException()).when(namespaceService).getBundleCount(any());
+        var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+        var expected = Set.of();
+        assertEquals(actual, expected);
+        verify(counter, times(2)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+    }
+
+    public void testMaxMsgRate() {
+        var counter = spy(new SplitCounter());
+        var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter);
+        int threshold = config.getLoadBalancerNamespaceBundleSplitConditionThreshold();
+        bundleStats.values().forEach(v -> {
+            v.msgRateOut = config.getLoadBalancerNamespaceBundleMaxMsgRate() / 2 + 1;
+            v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() / 2 + 1;
+        });
+        for (int i = 0; i < threshold + 2; i++) {
+            var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+            if (i == threshold) {
+                SplitDecision decision1 = new SplitDecision();
+                decision1.setSplit(new Split(bundle1, broker, new HashMap<>()));
+                decision1.succeed(SplitDecision.Reason.MsgRate);
+
+                assertEquals(actual, Set.of(decision1));
+                verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+            } else if (i == threshold + 1) {
+                SplitDecision decision1 = new SplitDecision();
+                decision1.setSplit(new Split(bundle2, broker, new HashMap<>()));
+                decision1.succeed(SplitDecision.Reason.MsgRate);
+
+                assertEquals(actual, Set.of(decision1));
+                verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+            } else {
+                assertEquals(actual, Set.of());
+                verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+            }
+        }
+    }
+
+    public void testMaxTopics() {
+        var counter = spy(new SplitCounter());
+        var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter);
+        int threshold = config.getLoadBalancerNamespaceBundleSplitConditionThreshold();
+        bundleStats.values().forEach(v -> v.topics = config.getLoadBalancerNamespaceBundleMaxTopics() + 1);
+        for (int i = 0; i < threshold + 2; i++) {
+            var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+            if (i == threshold) {
+                SplitDecision decision1 = new SplitDecision();
+                decision1.setSplit(new Split(bundle1, broker, new HashMap<>()));
+                decision1.succeed(SplitDecision.Reason.Topics);
+
+                assertEquals(actual, Set.of(decision1));
+                verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+            } else if (i == threshold + 1) {
+                SplitDecision decision1 = new SplitDecision();
+                decision1.setSplit(new Split(bundle2, broker, new HashMap<>()));
+                decision1.succeed(SplitDecision.Reason.Topics);
+
+                assertEquals(actual, Set.of(decision1));
+                verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+            } else {
+                assertEquals(actual, Set.of());
+                verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+            }
+        }
+    }
+
+    public void testMaxSessions() {
+        var counter = spy(new SplitCounter());
+        var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter);
+        int threshold = config.getLoadBalancerNamespaceBundleSplitConditionThreshold();
+        bundleStats.values().forEach(v -> {
+            v.producerCount = config.getLoadBalancerNamespaceBundleMaxSessions() / 2 + 1;
+            v.consumerCount = config.getLoadBalancerNamespaceBundleMaxSessions() / 2 + 1;
+        });
+        for (int i = 0; i < threshold + 2; i++) {
+            var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+            if (i == threshold) {
+                SplitDecision decision1 = new SplitDecision();
+                decision1.setSplit(new Split(bundle1, broker, new HashMap<>()));
+                decision1.succeed(SplitDecision.Reason.Sessions);
+
+                assertEquals(actual, Set.of(decision1));
+                verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+            } else if (i == threshold + 1) {
+                SplitDecision decision1 = new SplitDecision();
+                decision1.setSplit(new Split(bundle2, broker, new HashMap<>()));
+                decision1.succeed(SplitDecision.Reason.Sessions);
+
+                assertEquals(actual, Set.of(decision1));
+                verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+            } else {
+                assertEquals(actual, Set.of());
+                verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+            }
+        }
+    }
+
+    public void testMaxBandwidthMbytes() {
+        var counter = spy(new SplitCounter());
+        var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter);
+        int threshold = config.getLoadBalancerNamespaceBundleSplitConditionThreshold();
+        bundleStats.values().forEach(v -> {
+            v.msgThroughputOut = config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * 1024 * 1024 / 2 + 1;
+            v.msgThroughputIn = config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * 1024 * 1024 / 2 + 1;
+        });
+        for (int i = 0; i < threshold + 2; i++) {
+            var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+            if (i == threshold) {
+                SplitDecision decision1 = new SplitDecision();
+                decision1.setSplit(new Split(bundle1, broker, new HashMap<>()));
+                decision1.succeed(SplitDecision.Reason.Bandwidth);
+
+                assertEquals(actual, Set.of(decision1));
+                verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+            } else if (i == threshold + 1) {
+                SplitDecision decision1 = new SplitDecision();
+                decision1.setSplit(new Split(bundle2, broker, new HashMap<>()));
+                decision1.succeed(SplitDecision.Reason.Bandwidth);
+
+                assertEquals(actual, Set.of(decision1));
+                verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+            } else {
+                assertEquals(actual, Set.of());
+                verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+            }
+        }
+    }
+
+}