You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2023/02/02 18:06:49 UTC

[pulsar] branch master updated: [improve][broker] PIP-220 Added TransferShedder (#18865)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4caf41a762d [improve][broker] PIP-220 Added TransferShedder (#18865)
4caf41a762d is described below

commit 4caf41a762d758db064020b551d6a2bae1b21c50
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Thu Feb 2 10:06:42 2023 -0800

    [improve][broker] PIP-220 Added TransferShedder (#18865)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  64 +++
 .../extensions/models/UnloadDecision.java          | 117 ++++
 .../scheduler/NamespaceUnloadStrategy.java         |  16 +-
 .../extensions/scheduler/TransferShedder.java      | 450 ++++++++++++++++
 .../extensions/data/BrokerLoadDataTest.java        |   2 +-
 .../extensions/scheduler/TransferShedderTest.java  | 599 +++++++++++++++++++++
 6 files changed, 1239 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 4a9a6e47bf3..557b30245af 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2450,6 +2450,70 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private long namespaceBundleUnloadingTimeoutMs = 60000;
 
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            dynamic = true,
+            doc = "Option to enable the debug mode for the load balancer logics. "
+                    + "The debug mode prints more logs to provide more information "
+                    + "such as load balance states and decisions. "
+                    + "(only used in load balancer extension logics)"
+    )
+    private boolean loadBalancerDebugModeEnabled = false;
+
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            dynamic = true,
+            doc = "The target standard deviation of the resource usage across brokers "
+                    + "(100% resource usage is 1.0 load). "
+                    + "The shedder logic tries to distribute bundle load across brokers to meet this target std. "
+                    + "The smaller value will incur load balancing more frequently. "
+                    + "(only used in load balancer extension TransferSheddeer)"
+    )
+    private double loadBalancerBrokerLoadTargetStd = 0.25;
+
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            dynamic = true,
+            doc = "Option to enable the bundle transfer mode when distributing bundle loads. "
+                    + "On: transfer bundles from overloaded brokers to underloaded "
+                    + "-- pre-assigns the destination broker upon unloading). "
+                    + "Off: unload bundles from overloaded brokers "
+                    + "-- post-assigns the destination broker upon lookups). "
+                    + "(only used in load balancer extension TransferSheddeer)"
+    )
+    private boolean loadBalancerTransferEnabled = true;
+
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            dynamic = true,
+            doc = "Maximum number of brokers to transfer bundle load for each unloading cycle. "
+                    + "The bigger value will incur more unloading/transfers for each unloading cycle. "
+                    + "(only used in load balancer extension TransferSheddeer)"
+    )
+    private int loadBalancerMaxNumberOfBrokerTransfersPerCycle = 3;
+
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            dynamic = true,
+            doc = "Delay (in seconds) to the next unloading cycle after unloading. "
+                    + "The logic tries to give enough time for brokers to recompute load after unloading. "
+                    + "The bigger value will delay the next unloading cycle longer. "
+                    + "(only used in load balancer extension TransferSheddeer)"
+    )
+    private long loadBalanceUnloadDelayInSeconds = 600;
+
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            dynamic = true,
+            doc = "Broker load data time to live (TTL in seconds). "
+                    + "The logic tries to avoid (possibly unavailable) brokers with out-dated load data, "
+                    + "and those brokers will be ignored in the load computation. "
+                    + "When tuning this value, please consider loadBalancerReportUpdateMaxIntervalMinutes. "
+                    + "The current default is loadBalancerReportUpdateMaxIntervalMinutes * 2. "
+                    + "(only used in load balancer extension TransferSheddeer)"
+    )
+    private long loadBalancerBrokerLoadDataTTLInSeconds = 1800;
+
     /**** --- Replication. --- ****/
     @FieldContext(
         category = CATEGORY_REPLICATION,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadDecision.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadDecision.java
new file mode 100644
index 00000000000..7d6651e3ff9
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadDecision.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.models;
+
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Skip;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Success;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Balanced;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoBundles;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoLoadData;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import lombok.Data;
+
+/**
+ * Defines the information required to unload or transfer a service unit(e.g. bundle).
+ */
+@Data
+public class UnloadDecision {
+    Multimap<String, Unload> unloads;
+    Label label;
+    Reason reason;
+    Double loadAvg;
+    Double loadStd;
+    public enum Label {
+        Success,
+        Skip,
+        Failure
+    }
+    public enum Reason {
+        Overloaded,
+        Underloaded,
+        Balanced,
+        NoBundles,
+        CoolDown,
+        OutDatedData,
+        NoLoadData,
+        NoBrokers,
+        Unknown
+    }
+
+    public UnloadDecision() {
+        unloads = ArrayListMultimap.create();
+        label = null;
+        reason = null;
+        loadAvg = null;
+        loadStd = null;
+    }
+
+    public void clear() {
+        unloads.clear();
+        label = null;
+        reason = null;
+        loadAvg = null;
+        loadStd = null;
+    }
+
+    public void skip(int numOfOverloadedBrokers,
+                     int numOfUnderloadedBrokers,
+                     int numOfBrokersWithEmptyLoadData,
+                     int numOfBrokersWithFewBundles) {
+        label = Skip;
+        if (numOfOverloadedBrokers == 0 && numOfUnderloadedBrokers == 0) {
+            reason = Balanced;
+        } else if (numOfBrokersWithEmptyLoadData > 0) {
+            reason = NoLoadData;
+        } else if (numOfBrokersWithFewBundles > 0) {
+            reason = NoBundles;
+        } else {
+            reason = Unknown;
+        }
+    }
+
+    public void skip(Reason reason) {
+        label = Skip;
+        this.reason = reason;
+    }
+
+    public void succeed(
+                        int numOfOverloadedBrokers,
+                        int numOfUnderloadedBrokers) {
+
+        label = Success;
+        if (numOfOverloadedBrokers > numOfUnderloadedBrokers) {
+            reason = Overloaded;
+        } else {
+            reason = Underloaded;
+        }
+    }
+
+
+    public void fail() {
+        label = Failure;
+        reason = Unknown;
+    }
+
+
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceUnloadStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceUnloadStrategy.java
index 0942dc9522e..b4dc92d9218 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceUnloadStrategy.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceUnloadStrategy.java
@@ -18,10 +18,9 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
 
-import java.util.List;
 import java.util.Map;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
-import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
+import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
 
 /**
  * The namespace unload strategy.
@@ -34,12 +33,13 @@ public interface NamespaceUnloadStrategy {
     /**
      * Recommend that all the returned bundles be unloaded.
      *
-     * @param context The context used for decisions.
-     * @param recentlyUnloadedBundles
-     *           The recently unloaded bundles.
-     * @return A list of the bundles that should be unloaded.
+     * @param context                 The context used for decisions.
+     * @param recentlyUnloadedBundles The recently unloaded bundles.
+     * @param recentlyUnloadedBrokers The recently unloaded brokers.
+     * @return unloadDecision containing a list of the bundles that should be unloaded.
      */
-    List<Unload> findBundlesForUnloading(LoadManagerContext context,
-                                         Map<String, Long> recentlyUnloadedBundles);
+    UnloadDecision findBundlesForUnloading(LoadManagerContext context,
+                                           Map<String, Long> recentlyUnloadedBundles,
+                                           Map<String, Long> recentlyUnloadedBrokers);
 
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
new file mode 100644
index 00000000000..a1047edc06f
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.CoolDown;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoBrokers;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.OutDatedData;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.MinMaxPriorityQueue;
+import java.util.Map;
+import java.util.Optional;
+import lombok.Getter;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableDouble;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
+import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
+import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Load shedding strategy that unloads bundles from the highest loaded brokers.
+ * This strategy is only configurable in the broker load balancer extenstions introduced by
+ * PIP-192[https://github.com/apache/pulsar/issues/16691].
+ *
+ * This load shedding strategy has the following goals:
+ * 1. Distribute bundle load across brokers in order to make the standard deviation of the avg resource usage,
+ * std(exponential-moving-avg(max(cpu, memory, network, throughput)) for each broker) below the target,
+ * configurable by loadBalancerBrokerLoadTargetStd.
+ * 2. Use the transfer protocol to transfer bundle load from the highest loaded to the lowest loaded brokers,
+ * if configured by loadBalancerTransferEnabled=true.
+ * 3. Avoid repeated bundle unloading by recomputing historical broker resource usage after unloading and also
+ * skipping the bundles that are recently unloaded.
+ * 4. Prioritize unloading bundles to underloaded brokers when their message throughput is zero(new brokers).
+ * 5. Do not use outdated broker load data (configurable by loadBalancerBrokerLoadDataTTLInSeconds).
+ * 6. Give enough time for each broker to recompute its load after unloading
+ * (configurable by loadBalanceUnloadDelayInSeconds)
+ * 7. Do not transfer bundles with namespace isolation policies or anti-affinity group policies.
+ * 8. Limit the max number of brokers to transfer bundle load for each cycle,
+ * (loadBalancerMaxNumberOfBrokerTransfersPerCycle).
+ * 9. Print more logs with a debug option(loadBalancerDebugModeEnabled=true).
+ */
+public class TransferShedder implements NamespaceUnloadStrategy {
+    private static final Logger log = LoggerFactory.getLogger(TransferShedder.class);
+    private static final double KB = 1024;
+    private final LoadStats stats = new LoadStats();
+    private final PulsarService pulsar;
+    private final SimpleResourceAllocationPolicies allocationPolicies;
+
+    private final UnloadDecision decision = new UnloadDecision();
+
+    @VisibleForTesting
+    public TransferShedder(){
+        this.pulsar = null;
+        this.allocationPolicies = null;
+    }
+
+    public TransferShedder(PulsarService pulsar){
+        this.pulsar = pulsar;
+        this.allocationPolicies = new SimpleResourceAllocationPolicies(pulsar);
+    }
+
+
+    @Getter
+    @Accessors(fluent = true)
+    static class LoadStats {
+        private double sum;
+        private double sqSum;
+        private int totalBrokers;
+        private double avg;
+        private double std;
+        private MinMaxPriorityQueue<String> minBrokers;
+        private MinMaxPriorityQueue<String> maxBrokers;
+        private LoadDataStore<BrokerLoadData> loadDataStore;
+
+        LoadStats() {
+            this.minBrokers = MinMaxPriorityQueue.orderedBy((a, b) -> Double.compare(
+                    loadDataStore.get((String) b).get().getWeightedMaxEMA(),
+                    loadDataStore.get((String) a).get().getWeightedMaxEMA())).create();
+            this.maxBrokers = MinMaxPriorityQueue.orderedBy((a, b) -> Double.compare(
+                    loadDataStore.get((String) a).get().getWeightedMaxEMA(),
+                    loadDataStore.get((String) b).get().getWeightedMaxEMA())).create();
+        }
+
+        private void update(double sum, double sqSum, int totalBrokers) {
+            this.sum = sum;
+            this.sqSum = sqSum;
+            this.totalBrokers = totalBrokers;
+
+            if (totalBrokers == 0) {
+                this.avg = 0;
+                this.std = 0;
+                minBrokers.clear();
+                maxBrokers.clear();
+            } else {
+                this.avg = sum / totalBrokers;
+                this.std = Math.sqrt(sqSum / totalBrokers - avg * avg);
+            }
+        }
+
+        void offload(double max, double min, double offload) {
+            sqSum -= max * max + min * min;
+            double maxd = Math.max(0, max - offload);
+            double mind = min + offload;
+            sqSum += maxd * maxd + mind * mind;
+            std = Math.sqrt(sqSum / totalBrokers - avg * avg);
+        }
+
+        void clear(){
+            sum = 0.0;
+            sqSum = 0.0;
+            totalBrokers = 0;
+            avg = 0.0;
+            std = 0.0;
+            minBrokers.clear();
+            maxBrokers.clear();
+        }
+
+        Optional<UnloadDecision.Reason> update(final LoadDataStore<BrokerLoadData> loadStore,
+                                               Map<String, Long> recentlyUnloadedBrokers,
+                                               final ServiceConfiguration conf) {
+
+
+            UnloadDecision.Reason decisionReason = null;
+            double sum = 0.0;
+            double sqSum = 0.0;
+            int totalBrokers = 0;
+            int maxTransfers = conf.getLoadBalancerMaxNumberOfBrokerTransfersPerCycle();
+            long now = System.currentTimeMillis();
+            for (Map.Entry<String, BrokerLoadData> entry : loadStore.entrySet()) {
+                BrokerLoadData localBrokerData = entry.getValue();
+                String broker = entry.getKey();
+
+                // We don't want to use the outdated load data.
+                if (now - localBrokerData.getUpdatedAt()
+                        > conf.getLoadBalancerBrokerLoadDataTTLInSeconds() * 1000) {
+                    log.warn(
+                            "Ignoring broker:{} load update because the load data timestamp:{} is too old.",
+                            broker, localBrokerData.getUpdatedAt());
+                    decisionReason = OutDatedData;
+                    continue;
+                }
+
+                // Also, we should give enough time for each broker to recompute its load after transfers.
+                if (recentlyUnloadedBrokers.containsKey(broker)) {
+                    if (localBrokerData.getUpdatedAt() - recentlyUnloadedBrokers.get(broker)
+                            < conf.getLoadBalanceUnloadDelayInSeconds() * 1000) {
+                        log.warn(
+                                "Broker:{} load data timestamp:{} is too early since "
+                                        + "the last transfer timestamp:{}. Stop unloading.",
+                                broker, localBrokerData.getUpdatedAt(), recentlyUnloadedBrokers.get(broker));
+                        update(0.0, 0.0, 0);
+                        return Optional.of(CoolDown);
+                    } else {
+                        recentlyUnloadedBrokers.remove(broker);
+                    }
+                }
+
+                double load = localBrokerData.getWeightedMaxEMA();
+
+                minBrokers.offer(broker);
+                if (minBrokers.size() > maxTransfers) {
+                    minBrokers.poll();
+                }
+                maxBrokers.offer(broker);
+                if (maxBrokers.size() > maxTransfers) {
+                    maxBrokers.poll();
+                }
+                sum += load;
+                sqSum += load * load;
+                totalBrokers++;
+            }
+
+
+            if (totalBrokers == 0) {
+                if (decisionReason == null) {
+                    decisionReason = NoBrokers;
+                }
+                update(0.0, 0.0, 0);
+                return Optional.of(decisionReason);
+            }
+
+            update(sum, sqSum, totalBrokers);
+            return Optional.empty();
+        }
+
+        boolean hasTransferableBrokers() {
+            return !(maxBrokers.isEmpty() || minBrokers.isEmpty()
+                    || maxBrokers.peekLast().equals(minBrokers().peekLast()));
+        }
+
+        void setLoadDataStore(LoadDataStore<BrokerLoadData> loadDataStore) {
+            this.loadDataStore = loadDataStore;
+        }
+
+        @Override
+        public String toString() {
+            return String.format(
+                    "sum:%.2f, sqSum:%.2f, avg:%.2f, std:%.2f, totalBrokers:%d, "
+                            + "minBrokers:%s, maxBrokers:%s",
+                    sum, sqSum, avg, std, totalBrokers, minBrokers, maxBrokers);
+        }
+    }
+
+
+    @Override
+    public UnloadDecision findBundlesForUnloading(LoadManagerContext context,
+                                                  Map<String, Long> recentlyUnloadedBundles,
+                                                  Map<String, Long> recentlyUnloadedBrokers) {
+        final var conf = context.brokerConfiguration();
+        decision.clear();
+        stats.clear();
+        var selectedBundlesCache = decision.getUnloads();
+
+        try {
+            final var loadStore = context.brokerLoadDataStore();
+            stats.setLoadDataStore(loadStore);
+            boolean debugMode = conf.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
+
+            var skipReason = stats.update(context.brokerLoadDataStore(), recentlyUnloadedBrokers, conf);
+            if (!skipReason.isEmpty()) {
+                decision.skip(skipReason.get());
+                log.warn("Failed to update load stat. Reason:{}. Stop unloading.", decision.getReason());
+                return decision;
+            }
+            decision.setLoadAvg(stats.avg);
+            decision.setLoadStd(stats.std);
+
+            if (debugMode) {
+                log.info("brokers' load stats:{}", stats);
+            }
+
+            // success metrics
+            int numOfOverloadedBrokers = 0;
+            int numOfUnderloadedBrokers = 0;
+
+            // skip metrics
+            int numOfBrokersWithEmptyLoadData = 0;
+            int numOfBrokersWithFewBundles = 0;
+
+            final double targetStd = conf.getLoadBalancerBrokerLoadTargetStd();
+            boolean transfer = conf.isLoadBalancerTransferEnabled();
+            while (true) {
+                if (!stats.hasTransferableBrokers()) {
+                    if (debugMode) {
+                        log.info("Exhausted target transfer brokers. Stop unloading");
+                    }
+                    break;
+                }
+                if (stats.std() <= targetStd) {
+                    if (hasMsgThroughput(context, stats.minBrokers.peekLast())) {
+                        if (debugMode) {
+                            log.info("std:{} <= targetStd:{} and minBroker:{} has msg throughput. Stop unloading.",
+                                    stats.std, targetStd, stats.minBrokers.peekLast());
+                        }
+                        break;
+                    } else {
+                        numOfUnderloadedBrokers++;
+                    }
+                } else {
+                    numOfOverloadedBrokers++;
+                }
+
+                String maxBroker = stats.maxBrokers().pollLast();
+                String minBroker = stats.minBrokers().pollLast();
+                Optional<BrokerLoadData> maxBrokerLoadData = context.brokerLoadDataStore().get(maxBroker);
+                Optional<BrokerLoadData> minBrokerLoadData = context.brokerLoadDataStore().get(minBroker);
+                if (maxBrokerLoadData.isEmpty()) {
+                    log.error("maxBroker:{} maxBrokerLoadData is empty. Skip unloading from this max broker.",
+                            maxBroker);
+                    numOfBrokersWithEmptyLoadData++;
+                    continue;
+                }
+                if (minBrokerLoadData.isEmpty()) {
+                    log.error("minBroker:{} minBrokerLoadData is empty. Skip unloading to this min broker.", minBroker);
+                    numOfBrokersWithEmptyLoadData++;
+                    continue;
+                }
+
+                double max = maxBrokerLoadData.get().getWeightedMaxEMA();
+                double min = minBrokerLoadData.get().getWeightedMaxEMA();
+                double offload = (max - min) / 2;
+                BrokerLoadData brokerLoadData = maxBrokerLoadData.get();
+                double brokerThroughput = brokerLoadData.getMsgThroughputIn() + brokerLoadData.getMsgThroughputOut();
+                double offloadThroughput = brokerThroughput * offload;
+
+                if (debugMode) {
+                    log.info(
+                            "Attempting to shed load from broker:{}{}, which has the max resource "
+                                    + "usage {}%, targetStd:{},"
+                                    + " -- Offloading {}%, at least {} KByte/s of traffic, left throughput {} KByte/s",
+                            maxBroker, transfer ? " to broker:" + minBroker : "",
+                            100 * max, targetStd,
+                            offload * 100, offloadThroughput / KB, (brokerThroughput - offloadThroughput) / KB);
+                }
+
+                MutableDouble trafficMarkedToOffload = new MutableDouble(0);
+                MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);
+
+                Optional<TopBundlesLoadData> bundlesLoadData = context.topBundleLoadDataStore().get(maxBroker);
+                if (bundlesLoadData.isEmpty() || bundlesLoadData.get().getTopBundlesLoadData().isEmpty()) {
+                    log.error("maxBroker:{} topBundlesLoadData is empty. Skip unloading from this broker.", maxBroker);
+                    numOfBrokersWithEmptyLoadData++;
+                    continue;
+                }
+
+                var topBundlesLoadData = bundlesLoadData.get().getTopBundlesLoadData();
+                if (topBundlesLoadData.size() > 1) {
+                    MutableInt remainingTopBundles = new MutableInt();
+                    topBundlesLoadData.stream()
+                            .filter(e ->
+                                    !recentlyUnloadedBundles.containsKey(e.bundleName()) && isTransferable(
+                                            e.bundleName())
+                            ).map((e) -> {
+                                String bundle = e.bundleName();
+                                var bundleData = e.stats();
+                                double throughput = bundleData.msgThroughputIn + bundleData.msgThroughputOut;
+                                remainingTopBundles.increment();
+                                return Pair.of(bundle, throughput);
+                            }).sorted((e1, e2) ->
+                                    Double.compare(e2.getRight(), e1.getRight())
+                            ).forEach(e -> {
+                                if (remainingTopBundles.getValue() > 1
+                                        && (trafficMarkedToOffload.doubleValue() < offloadThroughput
+                                        || atLeastOneBundleSelected.isFalse())) {
+                                    if (transfer) {
+                                        selectedBundlesCache.put(maxBroker,
+                                                new Unload(maxBroker, e.getLeft(),
+                                                        Optional.of(minBroker)));
+                                    } else {
+                                        selectedBundlesCache.put(maxBroker,
+                                                new Unload(maxBroker, e.getLeft()));
+                                    }
+                                    trafficMarkedToOffload.add(e.getRight());
+                                    atLeastOneBundleSelected.setTrue();
+                                    remainingTopBundles.decrement();
+                                }
+                            });
+                    if (atLeastOneBundleSelected.isFalse()) {
+                        numOfBrokersWithFewBundles++;
+                    }
+                } else if (topBundlesLoadData.size() == 1) {
+                    numOfBrokersWithFewBundles++;
+                    log.warn(
+                            "HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. "
+                                    + "No Load Shedding will be done on this broker",
+                            topBundlesLoadData.iterator().next(), maxBroker);
+                } else {
+                    numOfBrokersWithFewBundles++;
+                    log.warn("Broker {} is overloaded despite having no bundles", maxBroker);
+                }
+
+
+
+                if (trafficMarkedToOffload.getValue() > 0) {
+                    stats.offload(max, min, offload);
+                    if (debugMode) {
+                        log.info(
+                                String.format("brokers' load stats:%s, after offload{max:%.2f, min:%.2f, offload:%.2f}",
+                                        stats, max, min, offload));
+                    }
+                }
+            }
+
+            if (debugMode) {
+                log.info("selectedBundlesCache:{}", selectedBundlesCache);
+            }
+
+            if (decision.getUnloads().isEmpty()) {
+                decision.skip(
+                        numOfOverloadedBrokers,
+                        numOfUnderloadedBrokers,
+                        numOfBrokersWithEmptyLoadData,
+                        numOfBrokersWithFewBundles);
+            } else {
+                decision.succeed(
+                        numOfOverloadedBrokers,
+                        numOfUnderloadedBrokers);
+            }
+        } catch (Throwable e) {
+            log.error("Failed to process unloading. ", e);
+            decision.fail();
+        }
+
+        return decision;
+    }
+
+
+    private boolean hasMsgThroughput(LoadManagerContext context, String broker) {
+        var brokerLoadDataOptional = context.brokerLoadDataStore().get(broker);
+        if (brokerLoadDataOptional.isEmpty()) {
+            return false;
+        }
+        var brokerLoadData = brokerLoadDataOptional.get();
+        return brokerLoadData.getMsgThroughputIn() + brokerLoadData.getMsgThroughputOut() > 0.0;
+    }
+
+
+    private boolean isTransferable(String bundle) {
+        if (pulsar == null || allocationPolicies == null) {
+            return true;
+        }
+        NamespaceName namespace = NamespaceName.get(LoadManagerShared.getNamespaceNameFromBundleName(bundle));
+        if (allocationPolicies.areIsolationPoliciesPresent(namespace)) {
+            return false;
+        }
+
+        try {
+            var localPoliciesOptional = pulsar
+                    .getPulsarResources().getLocalPolicies().getLocalPolicies(namespace);
+            if (localPoliciesOptional.isPresent() && StringUtils.isNotBlank(
+                    localPoliciesOptional.get().namespaceAntiAffinityGroup)) {
+                return false;
+            }
+        } catch (MetadataStoreException e) {
+            log.error("Failed to get localPolicies. Assumes that bundle:{} is not transferable.", bundle, e);
+            return false;
+        }
+        return true;
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java
index cedf7bca5d5..de5975ba49b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java
@@ -60,7 +60,7 @@ public class BrokerLoadDataTest {
         usage1.setBandwidthIn(bandwidthIn);
         usage1.setBandwidthOut(bandwidthOut);
         data.update(usage1, 1,2,3,4, conf);
-
+        
         assertEquals(data.getCpu(), cpu);
         assertEquals(data.getMemory(), memory);
         assertEquals(data.getDirectMemory(), directMemory);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
new file mode 100644
index 00000000000..a16cf2d8a67
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Skip;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Success;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Balanced;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.CoolDown;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoBrokers;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoBundles;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoLoadData;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.OutDatedData;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.commons.math3.stat.descriptive.moment.Mean;
+import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
+import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
+import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.broker.resources.LocalPoliciesResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.common.policies.data.LocalPolicies;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+import org.testng.annotations.Test;
+
+
+@Test(groups = "broker")
+public class TransferShedderTest {
+    double setupLoadAvg = 0.36400000000000005;
+    double setupLoadStd = 0.3982762860126121;
+    public LoadManagerContext setupContext(){
+        var ctx = getContext();
+        ctx.brokerConfiguration().setLoadBalancerDebugModeEnabled(true);
+
+        var brokerLoadDataStore = ctx.brokerLoadDataStore();
+        brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx,  2));
+        brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx,  4));
+        brokerLoadDataStore.pushAsync("broker3", getCpuLoad(ctx,  6));
+        brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx,  80));
+        brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx,  90));
+
+        var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+        topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("bundleA", 1, 1));
+        topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("bundleB", 3, 1));
+        topBundlesLoadDataStore.pushAsync("broker3", getTopBundlesLoad("bundleC", 4, 2));
+        topBundlesLoadDataStore.pushAsync("broker4", getTopBundlesLoad("bundleD", 20, 60));
+        topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("bundleE", 70, 20));
+        return ctx;
+    }
+
+    public LoadManagerContext setupContext(int clusterSize) {
+        var ctx = getContext();
+        ctx.brokerConfiguration().setLoadBalancerDebugModeEnabled(true);
+
+        var brokerLoadDataStore = ctx.brokerLoadDataStore();
+        var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+
+        Random rand = new Random();
+        for (int i = 0; i < clusterSize; i++) {
+            int brokerLoad = rand.nextInt(100);
+            brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx,  brokerLoad));
+            int bundleLoad = rand.nextInt(brokerLoad + 1);
+            topBundlesLoadDataStore.pushAsync("broker" + i, getTopBundlesLoad("bundle" + i,
+                    bundleLoad, brokerLoad - bundleLoad));
+        }
+        return ctx;
+    }
+
+    public BrokerLoadData getCpuLoad(LoadManagerContext ctx, int load) {
+        var loadData = new BrokerLoadData();
+        SystemResourceUsage usage1 = new SystemResourceUsage();
+        var cpu = new ResourceUsage(load, 100.0);
+        var memory = new ResourceUsage(0.0, 100.0);
+        var directMemory= new ResourceUsage(0.0, 100.0);
+        var bandwidthIn= new ResourceUsage(0.0, 100.0);
+        var bandwidthOut= new ResourceUsage(0.0, 100.0);
+        usage1.setCpu(cpu);
+        usage1.setMemory(memory);
+        usage1.setDirectMemory(directMemory);
+        usage1.setBandwidthIn(bandwidthIn);
+        usage1.setBandwidthOut(bandwidthOut);
+        loadData.update(usage1, 1,2,3,4,
+                ctx.brokerConfiguration());
+        return loadData;
+    }
+
+    public TopBundlesLoadData getTopBundlesLoad(String bundlePrefix, int load1, int load2) {
+        var namespaceBundleStats1 = new NamespaceBundleStats();
+        namespaceBundleStats1.msgThroughputOut = load1;
+        var namespaceBundleStats2 = new NamespaceBundleStats();
+        namespaceBundleStats2.msgThroughputOut = load2;
+        var topLoadData = TopBundlesLoadData.of(List.of(
+                new TopBundlesLoadData.BundleLoadData(bundlePrefix + "-1", namespaceBundleStats1),
+                new TopBundlesLoadData.BundleLoadData(bundlePrefix + "-2", namespaceBundleStats2)),  2);
+        return topLoadData;
+    }
+
+    public TopBundlesLoadData getTopBundlesLoad(String bundlePrefix, int load1) {
+        var namespaceBundleStats1 = new NamespaceBundleStats();
+        namespaceBundleStats1.msgThroughputOut = load1;
+        var topLoadData = TopBundlesLoadData.of(List.of(
+                new TopBundlesLoadData.BundleLoadData(bundlePrefix + "-1", namespaceBundleStats1)), 2);
+        return topLoadData;
+    }
+
+    public LoadManagerContext getContext(){
+        var ctx = mock(LoadManagerContext.class);
+        var conf = new ServiceConfiguration();
+        var brokerLoadDataStore = new LoadDataStore<BrokerLoadData>() {
+            Map<String, BrokerLoadData> map = new HashMap<>();
+            @Override
+            public void close() throws IOException {
+
+            }
+
+            @Override
+            public CompletableFuture<Void> pushAsync(String key, BrokerLoadData loadData) {
+                map.put(key, loadData);
+                return null;
+            }
+
+            @Override
+            public CompletableFuture<Void> removeAsync(String key) {
+                return null;
+            }
+
+            @Override
+            public Optional<BrokerLoadData> get(String key) {
+                var val = map.get(key);
+                if (val == null) {
+                    return Optional.empty();
+                }
+                return Optional.of(val);
+            }
+
+            @Override
+            public void forEach(BiConsumer<String, BrokerLoadData> action) {
+
+            }
+
+            @Override
+            public Set<Map.Entry<String, BrokerLoadData>> entrySet() {
+                return map.entrySet();
+            }
+
+            @Override
+            public int size() {
+                return map.size();
+            }
+        };
+
+        var topBundleLoadDataStore = new LoadDataStore<TopBundlesLoadData>() {
+            Map<String, TopBundlesLoadData> map = new HashMap<>();
+            @Override
+            public void close() throws IOException {
+
+            }
+
+            @Override
+            public CompletableFuture<Void> pushAsync(String key, TopBundlesLoadData loadData) {
+                map.put(key, loadData);
+                return null;
+            }
+
+            @Override
+            public CompletableFuture<Void> removeAsync(String key) {
+                return null;
+            }
+
+            @Override
+            public Optional<TopBundlesLoadData> get(String key) {
+                var val = map.get(key);
+                if (val == null) {
+                    return Optional.empty();
+                }
+                return Optional.of(val);
+            }
+
+            @Override
+            public void forEach(BiConsumer<String, TopBundlesLoadData> action) {
+
+            }
+
+            @Override
+            public Set<Map.Entry<String, TopBundlesLoadData>> entrySet() {
+                return map.entrySet();
+            }
+
+            @Override
+            public int size() {
+                return map.size();
+            }
+        };
+        doReturn(conf).when(ctx).brokerConfiguration();
+        doReturn(brokerLoadDataStore).when(ctx).brokerLoadDataStore();
+        doReturn(topBundleLoadDataStore).when(ctx).topBundleLoadDataStore();
+        return ctx;
+    }
+
+    @Test
+    public void testEmptyBrokerLoadData() {
+        TransferShedder transferShedder = new TransferShedder();
+        var ctx = getContext();
+        ctx.brokerConfiguration().setLoadBalancerDebugModeEnabled(true);
+        var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+        var expected = new UnloadDecision();
+        expected.setLabel(Skip);
+        expected.setReason(NoBrokers);
+        assertEquals(res, expected);
+    }
+
+    @Test
+    public void testEmptyTopBundlesLoadData() {
+        TransferShedder transferShedder = new TransferShedder();
+        var ctx = getContext();
+        ctx.brokerConfiguration().setLoadBalancerDebugModeEnabled(true);
+        var brokerLoadDataStore = ctx.brokerLoadDataStore();
+
+        brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx,  90));
+        brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx,  10));
+        brokerLoadDataStore.pushAsync("broker3", getCpuLoad(ctx,  20));
+
+        var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+        var expected = new UnloadDecision();
+        expected.setLabel(Skip);
+        expected.setReason(NoLoadData);
+        expected.setLoadAvg(0.39999999999999997);
+        expected.setLoadStd(0.35590260840104376);
+        assertEquals(res, expected);
+    }
+
+    @Test
+    public void testOutDatedLoadData() throws IllegalAccessException {
+        TransferShedder transferShedder = new TransferShedder();
+        var ctx = setupContext();
+        var brokerLoadDataStore = ctx.brokerLoadDataStore();
+        var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+        assertEquals(res.getUnloads().size(), 2);
+
+
+        FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker1").get(), "updatedAt", 0, true);
+        FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker2").get(), "updatedAt", 0, true);
+        FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker3").get(), "updatedAt", 0, true);
+        FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker4").get(), "updatedAt", 0, true);
+        FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker5").get(), "updatedAt", 0, true);
+
+
+        res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+        var expected = new UnloadDecision();
+        expected.setLabel(Skip);
+        expected.setReason(OutDatedData);
+        assertEquals(res, expected);
+    }
+
+    @Test
+    public void testRecentlyUnloadedBrokers() {
+        TransferShedder transferShedder = new TransferShedder();
+        var ctx = setupContext();
+
+        Map<String, Long> recentlyUnloadedBrokers = new HashMap<>();
+        var oldTS = System.currentTimeMillis() - ctx.brokerConfiguration()
+                .getLoadBalancerBrokerLoadDataTTLInSeconds() * 1001;
+        recentlyUnloadedBrokers.put("broker1", oldTS);
+        var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), recentlyUnloadedBrokers);
+
+        var expected = new UnloadDecision();
+        var unloads = expected.getUnloads();
+        unloads.put("broker5",
+                new Unload("broker5", "bundleE-1", Optional.of("broker1")));
+        unloads.put("broker4",
+                new Unload("broker4", "bundleD-2", Optional.of("broker2")));
+
+        expected.setLabel(Success);
+        expected.setReason(Overloaded);
+        expected.setLoadAvg(setupLoadAvg);
+        expected.setLoadStd(setupLoadStd);
+        assertEquals(res, expected);
+
+        var now = System.currentTimeMillis();
+        recentlyUnloadedBrokers.put("broker1", now);
+        res = transferShedder.findBundlesForUnloading(ctx, Map.of(), recentlyUnloadedBrokers);
+
+        expected = new UnloadDecision();
+        expected.setLabel(Skip);
+        expected.setReason(CoolDown);
+        assertEquals(res, expected);
+    }
+
+    @Test
+    public void testRecentlyUnloadedBundles() {
+        TransferShedder transferShedder = new TransferShedder();
+        var ctx = setupContext();
+        Map<String, Long> recentlyUnloadedBundles = new HashMap<>();
+        var now = System.currentTimeMillis();
+        recentlyUnloadedBundles.put("bundleE-1", now);
+        recentlyUnloadedBundles.put("bundleE-2", now);
+        recentlyUnloadedBundles.put("bundleD-1", now);
+        recentlyUnloadedBundles.put("bundleD-2", now);
+        var res = transferShedder.findBundlesForUnloading(ctx, recentlyUnloadedBundles, Map.of());
+
+        var expected = new UnloadDecision();
+        expected.setLabel(Skip);
+        expected.skip(NoBundles);
+        expected.setLoadAvg(setupLoadAvg);
+        expected.setLoadStd(setupLoadStd);
+        assertEquals(res, expected);
+    }
+
+    @Test
+    public void testBundlesWithIsolationPolicies() throws IllegalAccessException {
+        var pulsar = mock(PulsarService.class);
+        TransferShedder transferShedder = new TransferShedder(pulsar);
+        var allocationPoliciesSpy = (SimpleResourceAllocationPolicies)
+                spy(FieldUtils.readDeclaredField(transferShedder, "allocationPolicies", true));
+        doReturn(true).when(allocationPoliciesSpy).areIsolationPoliciesPresent(any());
+        FieldUtils.writeDeclaredField(transferShedder, "allocationPolicies", allocationPoliciesSpy, true);
+        var ctx = setupContext();
+        var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+        topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("my-tenant/my-namespaceA/0x00000000_0xFFFFFFF", 1, 3));
+        topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("my-tenant/my-namespaceB/0x00000000_0xFFFFFFF", 2, 8));
+        topBundlesLoadDataStore.pushAsync("broker3", getTopBundlesLoad("my-tenant/my-namespaceC/0x00000000_0xFFFFFFF", 6, 10));
+        topBundlesLoadDataStore.pushAsync("broker4", getTopBundlesLoad("my-tenant/my-namespaceD/0x00000000_0xFFFFFFF", 10, 20));
+        topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("my-tenant/my-namespaceE/0x00000000_0xFFFFFFF", 70, 20));
+        var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+        var expected = new UnloadDecision();
+        expected.setLabel(Skip);
+        expected.skip(NoBundles);
+        expected.setLoadAvg(setupLoadAvg);
+        expected.setLoadStd(setupLoadStd);
+        assertEquals(res, expected);
+    }
+
+    @Test
+    public void testBundlesWithAntiAffinityGroup() throws IllegalAccessException, MetadataStoreException {
+        var pulsar = mock(PulsarService.class);
+        TransferShedder transferShedder = new TransferShedder(pulsar);
+        var allocationPoliciesSpy = (SimpleResourceAllocationPolicies)
+                spy(FieldUtils.readDeclaredField(transferShedder, "allocationPolicies", true));
+        doReturn(false).when(allocationPoliciesSpy).areIsolationPoliciesPresent(any());
+        FieldUtils.writeDeclaredField(transferShedder, "allocationPolicies", allocationPoliciesSpy, true);
+
+        var pulsarResourcesMock = mock(PulsarResources.class);
+        var localPoliciesResourcesMock = mock(LocalPoliciesResources.class);
+        doReturn(pulsarResourcesMock).when(pulsar).getPulsarResources();
+        doReturn(localPoliciesResourcesMock).when(pulsarResourcesMock).getLocalPolicies();
+        LocalPolicies localPolicies = new LocalPolicies(null, null, "namespaceAntiAffinityGroup");
+        doReturn(Optional.of(localPolicies)).when(localPoliciesResourcesMock).getLocalPolicies(any());
+
+        var ctx = setupContext();
+        var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+        topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("my-tenant/my-namespaceA/0x00000000_0xFFFFFFF", 1, 3));
+        topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("my-tenant/my-namespaceB/0x00000000_0xFFFFFFF", 2, 8));
+        topBundlesLoadDataStore.pushAsync("broker3", getTopBundlesLoad("my-tenant/my-namespaceC/0x00000000_0xFFFFFFF", 6, 10));
+        topBundlesLoadDataStore.pushAsync("broker4", getTopBundlesLoad("my-tenant/my-namespaceD/0x00000000_0xFFFFFFF", 10, 20));
+        topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("my-tenant/my-namespaceE/0x00000000_0xFFFFFFF", 70, 20));
+        var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+        var expected = new UnloadDecision();
+        expected.setLabel(Skip);
+        expected.skip(NoBundles);
+        expected.setLoadAvg(setupLoadAvg);
+        expected.setLoadStd(setupLoadStd);
+        assertEquals(res, expected);
+    }
+
+    @Test
+    public void testTargetStd() {
+        TransferShedder transferShedder = new TransferShedder();
+        var ctx = getContext();
+        ctx.brokerConfiguration().setLoadBalancerDebugModeEnabled(true);
+        var brokerLoadDataStore = ctx.brokerLoadDataStore();
+        brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx,  10));
+        brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx,  20));
+        brokerLoadDataStore.pushAsync("broker3", getCpuLoad(ctx,  30));
+
+        var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+
+        topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("bundleA", 30, 30));
+        topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("bundleB", 40, 40));
+        topBundlesLoadDataStore.pushAsync("broker3", getTopBundlesLoad("bundleC", 50, 50));
+
+        var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+        var expected = new UnloadDecision();
+        expected.setLabel(Skip);
+        expected.skip(Balanced);
+        expected.setLoadAvg(0.2000000063578288);
+        expected.setLoadStd(0.08164966587949089);
+        assertEquals(res, expected);
+    }
+
+    @Test
+    public void testSingleTopBundlesLoadData() {
+        TransferShedder transferShedder = new TransferShedder();
+        var ctx = setupContext();
+        var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+        topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("bundleA", 1));
+        topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("bundleB", 2));
+        topBundlesLoadDataStore.pushAsync("broker3", getTopBundlesLoad("bundleC", 6));
+        topBundlesLoadDataStore.pushAsync("broker4", getTopBundlesLoad("bundleD", 10));
+        topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("bundleE", 70));
+        var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+        var expected = new UnloadDecision();
+        expected.setLabel(Skip);
+        expected.skip(NoBundles);
+        expected.setLoadAvg(setupLoadAvg);
+        expected.setLoadStd(setupLoadStd);
+        assertEquals(res, expected);
+    }
+
+
+    @Test
+    public void testTargetStdAfterTransfer() {
+        TransferShedder transferShedder = new TransferShedder();
+        var ctx = setupContext();
+        var brokerLoadDataStore = ctx.brokerLoadDataStore();
+        brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx,  55));
+        brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx,  65));
+        var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+        var expected = new UnloadDecision();
+        var unloads = expected.getUnloads();
+        unloads.put("broker5",
+                new Unload("broker5", "bundleE-1", Optional.of("broker1")));
+        expected.setLabel(Success);
+        expected.setReason(Overloaded);
+        expected.setLoadAvg(0.26400000000000007);
+        expected.setLoadStd(0.27644891028904417);
+        assertEquals(res, expected);
+    }
+
+    @Test
+    public void testMinBrokerWithZeroTraffic() throws IllegalAccessException {
+        TransferShedder transferShedder = new TransferShedder();
+        var ctx = setupContext();
+        var brokerLoadDataStore = ctx.brokerLoadDataStore();
+        brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx,  55));
+        brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx,  65));
+
+        var load = getCpuLoad(ctx,  4);
+        FieldUtils.writeDeclaredField(load,"msgThroughputIn", 0, true);
+        FieldUtils.writeDeclaredField(load,"msgThroughputOut", 0, true);
+        brokerLoadDataStore.pushAsync("broker2", load);
+
+        var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+        var expected = new UnloadDecision();
+        var unloads = expected.getUnloads();
+        unloads.put("broker5",
+                new Unload("broker5", "bundleE-1", Optional.of("broker1")));
+        unloads.put("broker4",
+                new Unload("broker4", "bundleD-2", Optional.of("broker2")));
+        expected.setLabel(Success);
+        expected.setReason(Underloaded);
+        expected.setLoadAvg(0.26400000000000007);
+        expected.setLoadStd(0.27644891028904417);
+        assertEquals(res, expected);
+    }
+
+    @Test
+    public void testMaxNumberOfTransfersPerShedderCycle() {
+        TransferShedder transferShedder = new TransferShedder();
+        var ctx = setupContext();
+        ctx.brokerConfiguration()
+                .setLoadBalancerMaxNumberOfBrokerTransfersPerCycle(10);
+        var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+        var expected = new UnloadDecision();
+        var unloads = expected.getUnloads();
+        unloads.put("broker5",
+                new Unload("broker5", "bundleE-1", Optional.of("broker1")));
+        unloads.put("broker4",
+                new Unload("broker4", "bundleD-2", Optional.of("broker2")));
+        expected.setLabel(Success);
+        expected.setReason(Overloaded);
+        expected.setLoadAvg(setupLoadAvg);
+        expected.setLoadStd(setupLoadStd);
+        assertEquals(res, expected);
+    }
+
+    @Test
+    public void testRemainingTopBundles() {
+        TransferShedder transferShedder = new TransferShedder();
+        var ctx = setupContext();
+        var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+        topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("bundleE", 20, 20));
+        var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+        var expected = new UnloadDecision();
+        var unloads = expected.getUnloads();
+        unloads.put("broker5",
+                new Unload("broker5", "bundleE-1", Optional.of("broker1")));
+        unloads.put("broker4",
+                new Unload("broker4", "bundleD-2", Optional.of("broker2")));
+        expected.setLabel(Success);
+        expected.setReason(Overloaded);
+        expected.setLoadAvg(setupLoadAvg);
+        expected.setLoadStd(setupLoadStd);
+        assertEquals(res, expected);
+    }
+
+    @Test
+    public void testRandomLoad() throws IllegalAccessException {
+        TransferShedder transferShedder = new TransferShedder();
+        for (int i = 0; i < 5; i++) {
+            var ctx = setupContext(10);
+            var conf = ctx.brokerConfiguration();
+            transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+            var stats = (TransferShedder.LoadStats)
+                    FieldUtils.readDeclaredField(transferShedder, "stats", true);
+            assertTrue(stats.std() <= conf.getLoadBalancerBrokerLoadTargetStd()
+                    || (!stats.hasTransferableBrokers()));
+        }
+    }
+
+    @Test
+    public void testLoadStats() {
+        int numBrokers = 10;
+        double delta = 0.0001;
+        for (int t = 0; t < 5; t++) {
+            var ctx = setupContext(numBrokers);
+            TransferShedder.LoadStats stats = new TransferShedder.LoadStats();
+            var loadStore = ctx.brokerLoadDataStore();
+            stats.setLoadDataStore(loadStore);
+            var conf = ctx.brokerConfiguration();
+            stats.update(loadStore, Map.of(), conf);
+            double[] loads = new double[numBrokers];
+            var brokerLoadDataStore = ctx.brokerLoadDataStore();
+            for (int i = 0; i < loads.length; i++) {
+                loads[i] = loadStore.get("broker" + i).get().getWeightedMaxEMA();
+            }
+            int i = 0;
+            int j = loads.length - 1;
+            Arrays.sort(loads);
+            for (int k = 0; k < conf.getLoadBalancerMaxNumberOfBrokerTransfersPerCycle(); k++) {
+                double minLoad = loads[i];
+                double maxLoad = loads[j];
+                double offload = (maxLoad - minLoad) / 2;
+                Mean mean = new Mean();
+                StandardDeviation std = new StandardDeviation(false);
+                assertEquals(minLoad,
+                        loadStore.get(stats.minBrokers().pollLast()).get().getWeightedMaxEMA());
+                assertEquals(maxLoad,
+                        loadStore.get(stats.maxBrokers().pollLast()).get().getWeightedMaxEMA());
+                assertEquals(stats.totalBrokers(), numBrokers);
+                assertEquals(stats.avg(), mean.evaluate(loads), delta);
+                assertEquals(stats.std(), std.evaluate(loads), delta);
+                stats.offload(maxLoad, minLoad, offload);
+                loads[i++] = minLoad + offload;
+                loads[j--] = maxLoad - offload;
+            }
+        }
+    }
+}