You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2023/02/02 18:06:49 UTC
[pulsar] branch master updated: [improve][broker] PIP-220 Added TransferShedder (#18865)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4caf41a762d [improve][broker] PIP-220 Added TransferShedder (#18865)
4caf41a762d is described below
commit 4caf41a762d758db064020b551d6a2bae1b21c50
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Thu Feb 2 10:06:42 2023 -0800
[improve][broker] PIP-220 Added TransferShedder (#18865)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 64 +++
.../extensions/models/UnloadDecision.java | 117 ++++
.../scheduler/NamespaceUnloadStrategy.java | 16 +-
.../extensions/scheduler/TransferShedder.java | 450 ++++++++++++++++
.../extensions/data/BrokerLoadDataTest.java | 2 +-
.../extensions/scheduler/TransferShedderTest.java | 599 +++++++++++++++++++++
6 files changed, 1239 insertions(+), 9 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 4a9a6e47bf3..557b30245af 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2450,6 +2450,70 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private long namespaceBundleUnloadingTimeoutMs = 60000;
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ dynamic = true,
+ doc = "Option to enable the debug mode for the load balancer logics. "
+ + "The debug mode prints more logs to provide more information "
+ + "such as load balance states and decisions. "
+ + "(only used in load balancer extension logics)"
+ )
+ private boolean loadBalancerDebugModeEnabled = false;
+
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ dynamic = true,
+ doc = "The target standard deviation of the resource usage across brokers "
+ + "(100% resource usage is 1.0 load). "
+ + "The shedder logic tries to distribute bundle load across brokers to meet this target std. "
+ + "The smaller value will incur load balancing more frequently. "
+ + "(only used in load balancer extension TransferSheddeer)"
+ )
+ private double loadBalancerBrokerLoadTargetStd = 0.25;
+
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ dynamic = true,
+ doc = "Option to enable the bundle transfer mode when distributing bundle loads. "
+ + "On: transfer bundles from overloaded brokers to underloaded "
+ + "-- pre-assigns the destination broker upon unloading). "
+ + "Off: unload bundles from overloaded brokers "
+ + "-- post-assigns the destination broker upon lookups). "
+ + "(only used in load balancer extension TransferSheddeer)"
+ )
+ private boolean loadBalancerTransferEnabled = true;
+
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ dynamic = true,
+ doc = "Maximum number of brokers to transfer bundle load for each unloading cycle. "
+ + "The bigger value will incur more unloading/transfers for each unloading cycle. "
+ + "(only used in load balancer extension TransferSheddeer)"
+ )
+ private int loadBalancerMaxNumberOfBrokerTransfersPerCycle = 3;
+
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ dynamic = true,
+ doc = "Delay (in seconds) to the next unloading cycle after unloading. "
+ + "The logic tries to give enough time for brokers to recompute load after unloading. "
+ + "The bigger value will delay the next unloading cycle longer. "
+ + "(only used in load balancer extension TransferSheddeer)"
+ )
+ private long loadBalanceUnloadDelayInSeconds = 600;
+
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ dynamic = true,
+ doc = "Broker load data time to live (TTL in seconds). "
+ + "The logic tries to avoid (possibly unavailable) brokers with out-dated load data, "
+ + "and those brokers will be ignored in the load computation. "
+ + "When tuning this value, please consider loadBalancerReportUpdateMaxIntervalMinutes. "
+ + "The current default is loadBalancerReportUpdateMaxIntervalMinutes * 2. "
+ + "(only used in load balancer extension TransferSheddeer)"
+ )
+ private long loadBalancerBrokerLoadDataTTLInSeconds = 1800;
+
/**** --- Replication. --- ****/
@FieldContext(
category = CATEGORY_REPLICATION,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadDecision.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadDecision.java
new file mode 100644
index 00000000000..7d6651e3ff9
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadDecision.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.models;
+
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Skip;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Success;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Balanced;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoBundles;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoLoadData;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import lombok.Data;
+
+/**
+ * Defines the information required to unload or transfer a service unit(e.g. bundle).
+ */
+@Data
+public class UnloadDecision {
+ Multimap<String, Unload> unloads;
+ Label label;
+ Reason reason;
+ Double loadAvg;
+ Double loadStd;
+ public enum Label {
+ Success,
+ Skip,
+ Failure
+ }
+ public enum Reason {
+ Overloaded,
+ Underloaded,
+ Balanced,
+ NoBundles,
+ CoolDown,
+ OutDatedData,
+ NoLoadData,
+ NoBrokers,
+ Unknown
+ }
+
+ public UnloadDecision() {
+ unloads = ArrayListMultimap.create();
+ label = null;
+ reason = null;
+ loadAvg = null;
+ loadStd = null;
+ }
+
+ public void clear() {
+ unloads.clear();
+ label = null;
+ reason = null;
+ loadAvg = null;
+ loadStd = null;
+ }
+
+ public void skip(int numOfOverloadedBrokers,
+ int numOfUnderloadedBrokers,
+ int numOfBrokersWithEmptyLoadData,
+ int numOfBrokersWithFewBundles) {
+ label = Skip;
+ if (numOfOverloadedBrokers == 0 && numOfUnderloadedBrokers == 0) {
+ reason = Balanced;
+ } else if (numOfBrokersWithEmptyLoadData > 0) {
+ reason = NoLoadData;
+ } else if (numOfBrokersWithFewBundles > 0) {
+ reason = NoBundles;
+ } else {
+ reason = Unknown;
+ }
+ }
+
+ public void skip(Reason reason) {
+ label = Skip;
+ this.reason = reason;
+ }
+
+ public void succeed(
+ int numOfOverloadedBrokers,
+ int numOfUnderloadedBrokers) {
+
+ label = Success;
+ if (numOfOverloadedBrokers > numOfUnderloadedBrokers) {
+ reason = Overloaded;
+ } else {
+ reason = Underloaded;
+ }
+ }
+
+
+ public void fail() {
+ label = Failure;
+ reason = Unknown;
+ }
+
+
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceUnloadStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceUnloadStrategy.java
index 0942dc9522e..b4dc92d9218 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceUnloadStrategy.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceUnloadStrategy.java
@@ -18,10 +18,9 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
-import java.util.List;
import java.util.Map;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
-import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
+import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
/**
* The namespace unload strategy.
@@ -34,12 +33,13 @@ public interface NamespaceUnloadStrategy {
/**
* Recommend that all the returned bundles be unloaded.
*
- * @param context The context used for decisions.
- * @param recentlyUnloadedBundles
- * The recently unloaded bundles.
- * @return A list of the bundles that should be unloaded.
+ * @param context The context used for decisions.
+ * @param recentlyUnloadedBundles The recently unloaded bundles.
+ * @param recentlyUnloadedBrokers The recently unloaded brokers.
+ * @return unloadDecision containing a list of the bundles that should be unloaded.
*/
- List<Unload> findBundlesForUnloading(LoadManagerContext context,
- Map<String, Long> recentlyUnloadedBundles);
+ UnloadDecision findBundlesForUnloading(LoadManagerContext context,
+ Map<String, Long> recentlyUnloadedBundles,
+ Map<String, Long> recentlyUnloadedBrokers);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
new file mode 100644
index 00000000000..a1047edc06f
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.CoolDown;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoBrokers;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.OutDatedData;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.MinMaxPriorityQueue;
+import java.util.Map;
+import java.util.Optional;
+import lombok.Getter;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableDouble;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
+import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
+import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Load shedding strategy that unloads bundles from the highest loaded brokers.
+ * This strategy is only configurable in the broker load balancer extenstions introduced by
+ * PIP-192[https://github.com/apache/pulsar/issues/16691].
+ *
+ * This load shedding strategy has the following goals:
+ * 1. Distribute bundle load across brokers in order to make the standard deviation of the avg resource usage,
+ * std(exponential-moving-avg(max(cpu, memory, network, throughput)) for each broker) below the target,
+ * configurable by loadBalancerBrokerLoadTargetStd.
+ * 2. Use the transfer protocol to transfer bundle load from the highest loaded to the lowest loaded brokers,
+ * if configured by loadBalancerTransferEnabled=true.
+ * 3. Avoid repeated bundle unloading by recomputing historical broker resource usage after unloading and also
+ * skipping the bundles that are recently unloaded.
+ * 4. Prioritize unloading bundles to underloaded brokers when their message throughput is zero(new brokers).
+ * 5. Do not use outdated broker load data (configurable by loadBalancerBrokerLoadDataTTLInSeconds).
+ * 6. Give enough time for each broker to recompute its load after unloading
+ * (configurable by loadBalanceUnloadDelayInSeconds)
+ * 7. Do not transfer bundles with namespace isolation policies or anti-affinity group policies.
+ * 8. Limit the max number of brokers to transfer bundle load for each cycle,
+ * (loadBalancerMaxNumberOfBrokerTransfersPerCycle).
+ * 9. Print more logs with a debug option(loadBalancerDebugModeEnabled=true).
+ */
+public class TransferShedder implements NamespaceUnloadStrategy {
+ private static final Logger log = LoggerFactory.getLogger(TransferShedder.class);
+ private static final double KB = 1024;
+ private final LoadStats stats = new LoadStats();
+ private final PulsarService pulsar;
+ private final SimpleResourceAllocationPolicies allocationPolicies;
+
+ private final UnloadDecision decision = new UnloadDecision();
+
+ @VisibleForTesting
+ public TransferShedder(){
+ this.pulsar = null;
+ this.allocationPolicies = null;
+ }
+
+ public TransferShedder(PulsarService pulsar){
+ this.pulsar = pulsar;
+ this.allocationPolicies = new SimpleResourceAllocationPolicies(pulsar);
+ }
+
+
+ @Getter
+ @Accessors(fluent = true)
+ static class LoadStats {
+ private double sum;
+ private double sqSum;
+ private int totalBrokers;
+ private double avg;
+ private double std;
+ private MinMaxPriorityQueue<String> minBrokers;
+ private MinMaxPriorityQueue<String> maxBrokers;
+ private LoadDataStore<BrokerLoadData> loadDataStore;
+
+ LoadStats() {
+ this.minBrokers = MinMaxPriorityQueue.orderedBy((a, b) -> Double.compare(
+ loadDataStore.get((String) b).get().getWeightedMaxEMA(),
+ loadDataStore.get((String) a).get().getWeightedMaxEMA())).create();
+ this.maxBrokers = MinMaxPriorityQueue.orderedBy((a, b) -> Double.compare(
+ loadDataStore.get((String) a).get().getWeightedMaxEMA(),
+ loadDataStore.get((String) b).get().getWeightedMaxEMA())).create();
+ }
+
+ private void update(double sum, double sqSum, int totalBrokers) {
+ this.sum = sum;
+ this.sqSum = sqSum;
+ this.totalBrokers = totalBrokers;
+
+ if (totalBrokers == 0) {
+ this.avg = 0;
+ this.std = 0;
+ minBrokers.clear();
+ maxBrokers.clear();
+ } else {
+ this.avg = sum / totalBrokers;
+ this.std = Math.sqrt(sqSum / totalBrokers - avg * avg);
+ }
+ }
+
+ void offload(double max, double min, double offload) {
+ sqSum -= max * max + min * min;
+ double maxd = Math.max(0, max - offload);
+ double mind = min + offload;
+ sqSum += maxd * maxd + mind * mind;
+ std = Math.sqrt(sqSum / totalBrokers - avg * avg);
+ }
+
+ void clear(){
+ sum = 0.0;
+ sqSum = 0.0;
+ totalBrokers = 0;
+ avg = 0.0;
+ std = 0.0;
+ minBrokers.clear();
+ maxBrokers.clear();
+ }
+
+ Optional<UnloadDecision.Reason> update(final LoadDataStore<BrokerLoadData> loadStore,
+ Map<String, Long> recentlyUnloadedBrokers,
+ final ServiceConfiguration conf) {
+
+
+ UnloadDecision.Reason decisionReason = null;
+ double sum = 0.0;
+ double sqSum = 0.0;
+ int totalBrokers = 0;
+ int maxTransfers = conf.getLoadBalancerMaxNumberOfBrokerTransfersPerCycle();
+ long now = System.currentTimeMillis();
+ for (Map.Entry<String, BrokerLoadData> entry : loadStore.entrySet()) {
+ BrokerLoadData localBrokerData = entry.getValue();
+ String broker = entry.getKey();
+
+ // We don't want to use the outdated load data.
+ if (now - localBrokerData.getUpdatedAt()
+ > conf.getLoadBalancerBrokerLoadDataTTLInSeconds() * 1000) {
+ log.warn(
+ "Ignoring broker:{} load update because the load data timestamp:{} is too old.",
+ broker, localBrokerData.getUpdatedAt());
+ decisionReason = OutDatedData;
+ continue;
+ }
+
+ // Also, we should give enough time for each broker to recompute its load after transfers.
+ if (recentlyUnloadedBrokers.containsKey(broker)) {
+ if (localBrokerData.getUpdatedAt() - recentlyUnloadedBrokers.get(broker)
+ < conf.getLoadBalanceUnloadDelayInSeconds() * 1000) {
+ log.warn(
+ "Broker:{} load data timestamp:{} is too early since "
+ + "the last transfer timestamp:{}. Stop unloading.",
+ broker, localBrokerData.getUpdatedAt(), recentlyUnloadedBrokers.get(broker));
+ update(0.0, 0.0, 0);
+ return Optional.of(CoolDown);
+ } else {
+ recentlyUnloadedBrokers.remove(broker);
+ }
+ }
+
+ double load = localBrokerData.getWeightedMaxEMA();
+
+ minBrokers.offer(broker);
+ if (minBrokers.size() > maxTransfers) {
+ minBrokers.poll();
+ }
+ maxBrokers.offer(broker);
+ if (maxBrokers.size() > maxTransfers) {
+ maxBrokers.poll();
+ }
+ sum += load;
+ sqSum += load * load;
+ totalBrokers++;
+ }
+
+
+ if (totalBrokers == 0) {
+ if (decisionReason == null) {
+ decisionReason = NoBrokers;
+ }
+ update(0.0, 0.0, 0);
+ return Optional.of(decisionReason);
+ }
+
+ update(sum, sqSum, totalBrokers);
+ return Optional.empty();
+ }
+
+ boolean hasTransferableBrokers() {
+ return !(maxBrokers.isEmpty() || minBrokers.isEmpty()
+ || maxBrokers.peekLast().equals(minBrokers().peekLast()));
+ }
+
+ void setLoadDataStore(LoadDataStore<BrokerLoadData> loadDataStore) {
+ this.loadDataStore = loadDataStore;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "sum:%.2f, sqSum:%.2f, avg:%.2f, std:%.2f, totalBrokers:%d, "
+ + "minBrokers:%s, maxBrokers:%s",
+ sum, sqSum, avg, std, totalBrokers, minBrokers, maxBrokers);
+ }
+ }
+
+
+ @Override
+ public UnloadDecision findBundlesForUnloading(LoadManagerContext context,
+ Map<String, Long> recentlyUnloadedBundles,
+ Map<String, Long> recentlyUnloadedBrokers) {
+ final var conf = context.brokerConfiguration();
+ decision.clear();
+ stats.clear();
+ var selectedBundlesCache = decision.getUnloads();
+
+ try {
+ final var loadStore = context.brokerLoadDataStore();
+ stats.setLoadDataStore(loadStore);
+ boolean debugMode = conf.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
+
+ var skipReason = stats.update(context.brokerLoadDataStore(), recentlyUnloadedBrokers, conf);
+ if (!skipReason.isEmpty()) {
+ decision.skip(skipReason.get());
+ log.warn("Failed to update load stat. Reason:{}. Stop unloading.", decision.getReason());
+ return decision;
+ }
+ decision.setLoadAvg(stats.avg);
+ decision.setLoadStd(stats.std);
+
+ if (debugMode) {
+ log.info("brokers' load stats:{}", stats);
+ }
+
+ // success metrics
+ int numOfOverloadedBrokers = 0;
+ int numOfUnderloadedBrokers = 0;
+
+ // skip metrics
+ int numOfBrokersWithEmptyLoadData = 0;
+ int numOfBrokersWithFewBundles = 0;
+
+ final double targetStd = conf.getLoadBalancerBrokerLoadTargetStd();
+ boolean transfer = conf.isLoadBalancerTransferEnabled();
+ while (true) {
+ if (!stats.hasTransferableBrokers()) {
+ if (debugMode) {
+ log.info("Exhausted target transfer brokers. Stop unloading");
+ }
+ break;
+ }
+ if (stats.std() <= targetStd) {
+ if (hasMsgThroughput(context, stats.minBrokers.peekLast())) {
+ if (debugMode) {
+ log.info("std:{} <= targetStd:{} and minBroker:{} has msg throughput. Stop unloading.",
+ stats.std, targetStd, stats.minBrokers.peekLast());
+ }
+ break;
+ } else {
+ numOfUnderloadedBrokers++;
+ }
+ } else {
+ numOfOverloadedBrokers++;
+ }
+
+ String maxBroker = stats.maxBrokers().pollLast();
+ String minBroker = stats.minBrokers().pollLast();
+ Optional<BrokerLoadData> maxBrokerLoadData = context.brokerLoadDataStore().get(maxBroker);
+ Optional<BrokerLoadData> minBrokerLoadData = context.brokerLoadDataStore().get(minBroker);
+ if (maxBrokerLoadData.isEmpty()) {
+ log.error("maxBroker:{} maxBrokerLoadData is empty. Skip unloading from this max broker.",
+ maxBroker);
+ numOfBrokersWithEmptyLoadData++;
+ continue;
+ }
+ if (minBrokerLoadData.isEmpty()) {
+ log.error("minBroker:{} minBrokerLoadData is empty. Skip unloading to this min broker.", minBroker);
+ numOfBrokersWithEmptyLoadData++;
+ continue;
+ }
+
+ double max = maxBrokerLoadData.get().getWeightedMaxEMA();
+ double min = minBrokerLoadData.get().getWeightedMaxEMA();
+ double offload = (max - min) / 2;
+ BrokerLoadData brokerLoadData = maxBrokerLoadData.get();
+ double brokerThroughput = brokerLoadData.getMsgThroughputIn() + brokerLoadData.getMsgThroughputOut();
+ double offloadThroughput = brokerThroughput * offload;
+
+ if (debugMode) {
+ log.info(
+ "Attempting to shed load from broker:{}{}, which has the max resource "
+ + "usage {}%, targetStd:{},"
+ + " -- Offloading {}%, at least {} KByte/s of traffic, left throughput {} KByte/s",
+ maxBroker, transfer ? " to broker:" + minBroker : "",
+ 100 * max, targetStd,
+ offload * 100, offloadThroughput / KB, (brokerThroughput - offloadThroughput) / KB);
+ }
+
+ MutableDouble trafficMarkedToOffload = new MutableDouble(0);
+ MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);
+
+ Optional<TopBundlesLoadData> bundlesLoadData = context.topBundleLoadDataStore().get(maxBroker);
+ if (bundlesLoadData.isEmpty() || bundlesLoadData.get().getTopBundlesLoadData().isEmpty()) {
+ log.error("maxBroker:{} topBundlesLoadData is empty. Skip unloading from this broker.", maxBroker);
+ numOfBrokersWithEmptyLoadData++;
+ continue;
+ }
+
+ var topBundlesLoadData = bundlesLoadData.get().getTopBundlesLoadData();
+ if (topBundlesLoadData.size() > 1) {
+ MutableInt remainingTopBundles = new MutableInt();
+ topBundlesLoadData.stream()
+ .filter(e ->
+ !recentlyUnloadedBundles.containsKey(e.bundleName()) && isTransferable(
+ e.bundleName())
+ ).map((e) -> {
+ String bundle = e.bundleName();
+ var bundleData = e.stats();
+ double throughput = bundleData.msgThroughputIn + bundleData.msgThroughputOut;
+ remainingTopBundles.increment();
+ return Pair.of(bundle, throughput);
+ }).sorted((e1, e2) ->
+ Double.compare(e2.getRight(), e1.getRight())
+ ).forEach(e -> {
+ if (remainingTopBundles.getValue() > 1
+ && (trafficMarkedToOffload.doubleValue() < offloadThroughput
+ || atLeastOneBundleSelected.isFalse())) {
+ if (transfer) {
+ selectedBundlesCache.put(maxBroker,
+ new Unload(maxBroker, e.getLeft(),
+ Optional.of(minBroker)));
+ } else {
+ selectedBundlesCache.put(maxBroker,
+ new Unload(maxBroker, e.getLeft()));
+ }
+ trafficMarkedToOffload.add(e.getRight());
+ atLeastOneBundleSelected.setTrue();
+ remainingTopBundles.decrement();
+ }
+ });
+ if (atLeastOneBundleSelected.isFalse()) {
+ numOfBrokersWithFewBundles++;
+ }
+ } else if (topBundlesLoadData.size() == 1) {
+ numOfBrokersWithFewBundles++;
+ log.warn(
+ "HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. "
+ + "No Load Shedding will be done on this broker",
+ topBundlesLoadData.iterator().next(), maxBroker);
+ } else {
+ numOfBrokersWithFewBundles++;
+ log.warn("Broker {} is overloaded despite having no bundles", maxBroker);
+ }
+
+
+
+ if (trafficMarkedToOffload.getValue() > 0) {
+ stats.offload(max, min, offload);
+ if (debugMode) {
+ log.info(
+ String.format("brokers' load stats:%s, after offload{max:%.2f, min:%.2f, offload:%.2f}",
+ stats, max, min, offload));
+ }
+ }
+ }
+
+ if (debugMode) {
+ log.info("selectedBundlesCache:{}", selectedBundlesCache);
+ }
+
+ if (decision.getUnloads().isEmpty()) {
+ decision.skip(
+ numOfOverloadedBrokers,
+ numOfUnderloadedBrokers,
+ numOfBrokersWithEmptyLoadData,
+ numOfBrokersWithFewBundles);
+ } else {
+ decision.succeed(
+ numOfOverloadedBrokers,
+ numOfUnderloadedBrokers);
+ }
+ } catch (Throwable e) {
+ log.error("Failed to process unloading. ", e);
+ decision.fail();
+ }
+
+ return decision;
+ }
+
+
+ private boolean hasMsgThroughput(LoadManagerContext context, String broker) {
+ var brokerLoadDataOptional = context.brokerLoadDataStore().get(broker);
+ if (brokerLoadDataOptional.isEmpty()) {
+ return false;
+ }
+ var brokerLoadData = brokerLoadDataOptional.get();
+ return brokerLoadData.getMsgThroughputIn() + brokerLoadData.getMsgThroughputOut() > 0.0;
+ }
+
+
+ private boolean isTransferable(String bundle) {
+ if (pulsar == null || allocationPolicies == null) {
+ return true;
+ }
+ NamespaceName namespace = NamespaceName.get(LoadManagerShared.getNamespaceNameFromBundleName(bundle));
+ if (allocationPolicies.areIsolationPoliciesPresent(namespace)) {
+ return false;
+ }
+
+ try {
+ var localPoliciesOptional = pulsar
+ .getPulsarResources().getLocalPolicies().getLocalPolicies(namespace);
+ if (localPoliciesOptional.isPresent() && StringUtils.isNotBlank(
+ localPoliciesOptional.get().namespaceAntiAffinityGroup)) {
+ return false;
+ }
+ } catch (MetadataStoreException e) {
+ log.error("Failed to get localPolicies. Assumes that bundle:{} is not transferable.", bundle, e);
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java
index cedf7bca5d5..de5975ba49b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java
@@ -60,7 +60,7 @@ public class BrokerLoadDataTest {
usage1.setBandwidthIn(bandwidthIn);
usage1.setBandwidthOut(bandwidthOut);
data.update(usage1, 1,2,3,4, conf);
-
+
assertEquals(data.getCpu(), cpu);
assertEquals(data.getMemory(), memory);
assertEquals(data.getDirectMemory(), directMemory);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
new file mode 100644
index 00000000000..a16cf2d8a67
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Skip;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Success;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Balanced;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.CoolDown;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoBrokers;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoBundles;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoLoadData;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.OutDatedData;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.commons.math3.stat.descriptive.moment.Mean;
+import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
+import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
+import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.broker.resources.LocalPoliciesResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.common.policies.data.LocalPolicies;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+import org.testng.annotations.Test;
+
+
+@Test(groups = "broker")
+public class TransferShedderTest {
+ double setupLoadAvg = 0.36400000000000005;
+ double setupLoadStd = 0.3982762860126121;
+ public LoadManagerContext setupContext(){
+ var ctx = getContext();
+ ctx.brokerConfiguration().setLoadBalancerDebugModeEnabled(true);
+
+ var brokerLoadDataStore = ctx.brokerLoadDataStore();
+ brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx, 2));
+ brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx, 4));
+ brokerLoadDataStore.pushAsync("broker3", getCpuLoad(ctx, 6));
+ brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx, 80));
+ brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx, 90));
+
+ var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+ topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("bundleA", 1, 1));
+ topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("bundleB", 3, 1));
+ topBundlesLoadDataStore.pushAsync("broker3", getTopBundlesLoad("bundleC", 4, 2));
+ topBundlesLoadDataStore.pushAsync("broker4", getTopBundlesLoad("bundleD", 20, 60));
+ topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("bundleE", 70, 20));
+ return ctx;
+ }
+
+ public LoadManagerContext setupContext(int clusterSize) {
+ var ctx = getContext();
+ ctx.brokerConfiguration().setLoadBalancerDebugModeEnabled(true);
+
+ var brokerLoadDataStore = ctx.brokerLoadDataStore();
+ var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+
+ Random rand = new Random();
+ for (int i = 0; i < clusterSize; i++) {
+ int brokerLoad = rand.nextInt(100);
+ brokerLoadDataStore.pushAsync("broker" + i, getCpuLoad(ctx, brokerLoad));
+ int bundleLoad = rand.nextInt(brokerLoad + 1);
+ topBundlesLoadDataStore.pushAsync("broker" + i, getTopBundlesLoad("bundle" + i,
+ bundleLoad, brokerLoad - bundleLoad));
+ }
+ return ctx;
+ }
+
+ public BrokerLoadData getCpuLoad(LoadManagerContext ctx, int load) {
+ var loadData = new BrokerLoadData();
+ SystemResourceUsage usage1 = new SystemResourceUsage();
+ var cpu = new ResourceUsage(load, 100.0);
+ var memory = new ResourceUsage(0.0, 100.0);
+ var directMemory= new ResourceUsage(0.0, 100.0);
+ var bandwidthIn= new ResourceUsage(0.0, 100.0);
+ var bandwidthOut= new ResourceUsage(0.0, 100.0);
+ usage1.setCpu(cpu);
+ usage1.setMemory(memory);
+ usage1.setDirectMemory(directMemory);
+ usage1.setBandwidthIn(bandwidthIn);
+ usage1.setBandwidthOut(bandwidthOut);
+ loadData.update(usage1, 1,2,3,4,
+ ctx.brokerConfiguration());
+ return loadData;
+ }
+
+ public TopBundlesLoadData getTopBundlesLoad(String bundlePrefix, int load1, int load2) {
+ var namespaceBundleStats1 = new NamespaceBundleStats();
+ namespaceBundleStats1.msgThroughputOut = load1;
+ var namespaceBundleStats2 = new NamespaceBundleStats();
+ namespaceBundleStats2.msgThroughputOut = load2;
+ var topLoadData = TopBundlesLoadData.of(List.of(
+ new TopBundlesLoadData.BundleLoadData(bundlePrefix + "-1", namespaceBundleStats1),
+ new TopBundlesLoadData.BundleLoadData(bundlePrefix + "-2", namespaceBundleStats2)), 2);
+ return topLoadData;
+ }
+
+ public TopBundlesLoadData getTopBundlesLoad(String bundlePrefix, int load1) {
+ var namespaceBundleStats1 = new NamespaceBundleStats();
+ namespaceBundleStats1.msgThroughputOut = load1;
+ var topLoadData = TopBundlesLoadData.of(List.of(
+ new TopBundlesLoadData.BundleLoadData(bundlePrefix + "-1", namespaceBundleStats1)), 2);
+ return topLoadData;
+ }
+
+ public LoadManagerContext getContext(){
+ var ctx = mock(LoadManagerContext.class);
+ var conf = new ServiceConfiguration();
+ var brokerLoadDataStore = new LoadDataStore<BrokerLoadData>() {
+ Map<String, BrokerLoadData> map = new HashMap<>();
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public CompletableFuture<Void> pushAsync(String key, BrokerLoadData loadData) {
+ map.put(key, loadData);
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> removeAsync(String key) {
+ return null;
+ }
+
+ @Override
+ public Optional<BrokerLoadData> get(String key) {
+ var val = map.get(key);
+ if (val == null) {
+ return Optional.empty();
+ }
+ return Optional.of(val);
+ }
+
+ @Override
+ public void forEach(BiConsumer<String, BrokerLoadData> action) {
+
+ }
+
+ @Override
+ public Set<Map.Entry<String, BrokerLoadData>> entrySet() {
+ return map.entrySet();
+ }
+
+ @Override
+ public int size() {
+ return map.size();
+ }
+ };
+
+ var topBundleLoadDataStore = new LoadDataStore<TopBundlesLoadData>() {
+ Map<String, TopBundlesLoadData> map = new HashMap<>();
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public CompletableFuture<Void> pushAsync(String key, TopBundlesLoadData loadData) {
+ map.put(key, loadData);
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> removeAsync(String key) {
+ return null;
+ }
+
+ @Override
+ public Optional<TopBundlesLoadData> get(String key) {
+ var val = map.get(key);
+ if (val == null) {
+ return Optional.empty();
+ }
+ return Optional.of(val);
+ }
+
+ @Override
+ public void forEach(BiConsumer<String, TopBundlesLoadData> action) {
+
+ }
+
+ @Override
+ public Set<Map.Entry<String, TopBundlesLoadData>> entrySet() {
+ return map.entrySet();
+ }
+
+ @Override
+ public int size() {
+ return map.size();
+ }
+ };
+ doReturn(conf).when(ctx).brokerConfiguration();
+ doReturn(brokerLoadDataStore).when(ctx).brokerLoadDataStore();
+ doReturn(topBundleLoadDataStore).when(ctx).topBundleLoadDataStore();
+ return ctx;
+ }
+
+ @Test
+ public void testEmptyBrokerLoadData() {
+ TransferShedder transferShedder = new TransferShedder();
+ var ctx = getContext();
+ ctx.brokerConfiguration().setLoadBalancerDebugModeEnabled(true);
+ var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+ var expected = new UnloadDecision();
+ expected.setLabel(Skip);
+ expected.setReason(NoBrokers);
+ assertEquals(res, expected);
+ }
+
+ @Test
+ public void testEmptyTopBundlesLoadData() {
+ TransferShedder transferShedder = new TransferShedder();
+ var ctx = getContext();
+ ctx.brokerConfiguration().setLoadBalancerDebugModeEnabled(true);
+ var brokerLoadDataStore = ctx.brokerLoadDataStore();
+
+ brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx, 90));
+ brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx, 10));
+ brokerLoadDataStore.pushAsync("broker3", getCpuLoad(ctx, 20));
+
+ var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+ var expected = new UnloadDecision();
+ expected.setLabel(Skip);
+ expected.setReason(NoLoadData);
+ expected.setLoadAvg(0.39999999999999997);
+ expected.setLoadStd(0.35590260840104376);
+ assertEquals(res, expected);
+ }
+
+ @Test
+ public void testOutDatedLoadData() throws IllegalAccessException {
+ TransferShedder transferShedder = new TransferShedder();
+ var ctx = setupContext();
+ var brokerLoadDataStore = ctx.brokerLoadDataStore();
+ var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+ assertEquals(res.getUnloads().size(), 2);
+
+
+ FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker1").get(), "updatedAt", 0, true);
+ FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker2").get(), "updatedAt", 0, true);
+ FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker3").get(), "updatedAt", 0, true);
+ FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker4").get(), "updatedAt", 0, true);
+ FieldUtils.writeDeclaredField(brokerLoadDataStore.get("broker5").get(), "updatedAt", 0, true);
+
+
+ res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+ var expected = new UnloadDecision();
+ expected.setLabel(Skip);
+ expected.setReason(OutDatedData);
+ assertEquals(res, expected);
+ }
+
+ @Test
+ public void testRecentlyUnloadedBrokers() {
+ TransferShedder transferShedder = new TransferShedder();
+ var ctx = setupContext();
+
+ Map<String, Long> recentlyUnloadedBrokers = new HashMap<>();
+ var oldTS = System.currentTimeMillis() - ctx.brokerConfiguration()
+ .getLoadBalancerBrokerLoadDataTTLInSeconds() * 1001;
+ recentlyUnloadedBrokers.put("broker1", oldTS);
+ var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), recentlyUnloadedBrokers);
+
+ var expected = new UnloadDecision();
+ var unloads = expected.getUnloads();
+ unloads.put("broker5",
+ new Unload("broker5", "bundleE-1", Optional.of("broker1")));
+ unloads.put("broker4",
+ new Unload("broker4", "bundleD-2", Optional.of("broker2")));
+
+ expected.setLabel(Success);
+ expected.setReason(Overloaded);
+ expected.setLoadAvg(setupLoadAvg);
+ expected.setLoadStd(setupLoadStd);
+ assertEquals(res, expected);
+
+ var now = System.currentTimeMillis();
+ recentlyUnloadedBrokers.put("broker1", now);
+ res = transferShedder.findBundlesForUnloading(ctx, Map.of(), recentlyUnloadedBrokers);
+
+ expected = new UnloadDecision();
+ expected.setLabel(Skip);
+ expected.setReason(CoolDown);
+ assertEquals(res, expected);
+ }
+
+ @Test
+ public void testRecentlyUnloadedBundles() {
+ TransferShedder transferShedder = new TransferShedder();
+ var ctx = setupContext();
+ Map<String, Long> recentlyUnloadedBundles = new HashMap<>();
+ var now = System.currentTimeMillis();
+ recentlyUnloadedBundles.put("bundleE-1", now);
+ recentlyUnloadedBundles.put("bundleE-2", now);
+ recentlyUnloadedBundles.put("bundleD-1", now);
+ recentlyUnloadedBundles.put("bundleD-2", now);
+ var res = transferShedder.findBundlesForUnloading(ctx, recentlyUnloadedBundles, Map.of());
+
+ var expected = new UnloadDecision();
+ expected.setLabel(Skip);
+ expected.skip(NoBundles);
+ expected.setLoadAvg(setupLoadAvg);
+ expected.setLoadStd(setupLoadStd);
+ assertEquals(res, expected);
+ }
+
+ @Test
+ public void testBundlesWithIsolationPolicies() throws IllegalAccessException {
+ var pulsar = mock(PulsarService.class);
+ TransferShedder transferShedder = new TransferShedder(pulsar);
+ var allocationPoliciesSpy = (SimpleResourceAllocationPolicies)
+ spy(FieldUtils.readDeclaredField(transferShedder, "allocationPolicies", true));
+ doReturn(true).when(allocationPoliciesSpy).areIsolationPoliciesPresent(any());
+ FieldUtils.writeDeclaredField(transferShedder, "allocationPolicies", allocationPoliciesSpy, true);
+ var ctx = setupContext();
+ var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+ topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("my-tenant/my-namespaceA/0x00000000_0xFFFFFFF", 1, 3));
+ topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("my-tenant/my-namespaceB/0x00000000_0xFFFFFFF", 2, 8));
+ topBundlesLoadDataStore.pushAsync("broker3", getTopBundlesLoad("my-tenant/my-namespaceC/0x00000000_0xFFFFFFF", 6, 10));
+ topBundlesLoadDataStore.pushAsync("broker4", getTopBundlesLoad("my-tenant/my-namespaceD/0x00000000_0xFFFFFFF", 10, 20));
+ topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("my-tenant/my-namespaceE/0x00000000_0xFFFFFFF", 70, 20));
+ var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+ var expected = new UnloadDecision();
+ expected.setLabel(Skip);
+ expected.skip(NoBundles);
+ expected.setLoadAvg(setupLoadAvg);
+ expected.setLoadStd(setupLoadStd);
+ assertEquals(res, expected);
+ }
+
+ @Test
+ public void testBundlesWithAntiAffinityGroup() throws IllegalAccessException, MetadataStoreException {
+ var pulsar = mock(PulsarService.class);
+ TransferShedder transferShedder = new TransferShedder(pulsar);
+ var allocationPoliciesSpy = (SimpleResourceAllocationPolicies)
+ spy(FieldUtils.readDeclaredField(transferShedder, "allocationPolicies", true));
+ doReturn(false).when(allocationPoliciesSpy).areIsolationPoliciesPresent(any());
+ FieldUtils.writeDeclaredField(transferShedder, "allocationPolicies", allocationPoliciesSpy, true);
+
+ var pulsarResourcesMock = mock(PulsarResources.class);
+ var localPoliciesResourcesMock = mock(LocalPoliciesResources.class);
+ doReturn(pulsarResourcesMock).when(pulsar).getPulsarResources();
+ doReturn(localPoliciesResourcesMock).when(pulsarResourcesMock).getLocalPolicies();
+ LocalPolicies localPolicies = new LocalPolicies(null, null, "namespaceAntiAffinityGroup");
+ doReturn(Optional.of(localPolicies)).when(localPoliciesResourcesMock).getLocalPolicies(any());
+
+ var ctx = setupContext();
+ var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+ topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("my-tenant/my-namespaceA/0x00000000_0xFFFFFFF", 1, 3));
+ topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("my-tenant/my-namespaceB/0x00000000_0xFFFFFFF", 2, 8));
+ topBundlesLoadDataStore.pushAsync("broker3", getTopBundlesLoad("my-tenant/my-namespaceC/0x00000000_0xFFFFFFF", 6, 10));
+ topBundlesLoadDataStore.pushAsync("broker4", getTopBundlesLoad("my-tenant/my-namespaceD/0x00000000_0xFFFFFFF", 10, 20));
+ topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("my-tenant/my-namespaceE/0x00000000_0xFFFFFFF", 70, 20));
+ var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+ var expected = new UnloadDecision();
+ expected.setLabel(Skip);
+ expected.skip(NoBundles);
+ expected.setLoadAvg(setupLoadAvg);
+ expected.setLoadStd(setupLoadStd);
+ assertEquals(res, expected);
+ }
+
+ @Test
+ public void testTargetStd() {
+ TransferShedder transferShedder = new TransferShedder();
+ var ctx = getContext();
+ ctx.brokerConfiguration().setLoadBalancerDebugModeEnabled(true);
+ var brokerLoadDataStore = ctx.brokerLoadDataStore();
+ brokerLoadDataStore.pushAsync("broker1", getCpuLoad(ctx, 10));
+ brokerLoadDataStore.pushAsync("broker2", getCpuLoad(ctx, 20));
+ brokerLoadDataStore.pushAsync("broker3", getCpuLoad(ctx, 30));
+
+ var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+
+ topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("bundleA", 30, 30));
+ topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("bundleB", 40, 40));
+ topBundlesLoadDataStore.pushAsync("broker3", getTopBundlesLoad("bundleC", 50, 50));
+
+ var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+ var expected = new UnloadDecision();
+ expected.setLabel(Skip);
+ expected.skip(Balanced);
+ expected.setLoadAvg(0.2000000063578288);
+ expected.setLoadStd(0.08164966587949089);
+ assertEquals(res, expected);
+ }
+
+ @Test
+ public void testSingleTopBundlesLoadData() {
+ TransferShedder transferShedder = new TransferShedder();
+ var ctx = setupContext();
+ var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+ topBundlesLoadDataStore.pushAsync("broker1", getTopBundlesLoad("bundleA", 1));
+ topBundlesLoadDataStore.pushAsync("broker2", getTopBundlesLoad("bundleB", 2));
+ topBundlesLoadDataStore.pushAsync("broker3", getTopBundlesLoad("bundleC", 6));
+ topBundlesLoadDataStore.pushAsync("broker4", getTopBundlesLoad("bundleD", 10));
+ topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("bundleE", 70));
+ var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+ var expected = new UnloadDecision();
+ expected.setLabel(Skip);
+ expected.skip(NoBundles);
+ expected.setLoadAvg(setupLoadAvg);
+ expected.setLoadStd(setupLoadStd);
+ assertEquals(res, expected);
+ }
+
+
+ @Test
+ public void testTargetStdAfterTransfer() {
+ TransferShedder transferShedder = new TransferShedder();
+ var ctx = setupContext();
+ var brokerLoadDataStore = ctx.brokerLoadDataStore();
+ brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx, 55));
+ brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx, 65));
+ var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+ var expected = new UnloadDecision();
+ var unloads = expected.getUnloads();
+ unloads.put("broker5",
+ new Unload("broker5", "bundleE-1", Optional.of("broker1")));
+ expected.setLabel(Success);
+ expected.setReason(Overloaded);
+ expected.setLoadAvg(0.26400000000000007);
+ expected.setLoadStd(0.27644891028904417);
+ assertEquals(res, expected);
+ }
+
+ @Test
+ public void testMinBrokerWithZeroTraffic() throws IllegalAccessException {
+ TransferShedder transferShedder = new TransferShedder();
+ var ctx = setupContext();
+ var brokerLoadDataStore = ctx.brokerLoadDataStore();
+ brokerLoadDataStore.pushAsync("broker4", getCpuLoad(ctx, 55));
+ brokerLoadDataStore.pushAsync("broker5", getCpuLoad(ctx, 65));
+
+ var load = getCpuLoad(ctx, 4);
+ FieldUtils.writeDeclaredField(load,"msgThroughputIn", 0, true);
+ FieldUtils.writeDeclaredField(load,"msgThroughputOut", 0, true);
+ brokerLoadDataStore.pushAsync("broker2", load);
+
+ var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+ var expected = new UnloadDecision();
+ var unloads = expected.getUnloads();
+ unloads.put("broker5",
+ new Unload("broker5", "bundleE-1", Optional.of("broker1")));
+ unloads.put("broker4",
+ new Unload("broker4", "bundleD-2", Optional.of("broker2")));
+ expected.setLabel(Success);
+ expected.setReason(Underloaded);
+ expected.setLoadAvg(0.26400000000000007);
+ expected.setLoadStd(0.27644891028904417);
+ assertEquals(res, expected);
+ }
+
+ @Test
+ public void testMaxNumberOfTransfersPerShedderCycle() {
+ TransferShedder transferShedder = new TransferShedder();
+ var ctx = setupContext();
+ ctx.brokerConfiguration()
+ .setLoadBalancerMaxNumberOfBrokerTransfersPerCycle(10);
+ var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+ var expected = new UnloadDecision();
+ var unloads = expected.getUnloads();
+ unloads.put("broker5",
+ new Unload("broker5", "bundleE-1", Optional.of("broker1")));
+ unloads.put("broker4",
+ new Unload("broker4", "bundleD-2", Optional.of("broker2")));
+ expected.setLabel(Success);
+ expected.setReason(Overloaded);
+ expected.setLoadAvg(setupLoadAvg);
+ expected.setLoadStd(setupLoadStd);
+ assertEquals(res, expected);
+ }
+
+ @Test
+ public void testRemainingTopBundles() {
+ TransferShedder transferShedder = new TransferShedder();
+ var ctx = setupContext();
+ var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+ topBundlesLoadDataStore.pushAsync("broker5", getTopBundlesLoad("bundleE", 20, 20));
+ var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+ var expected = new UnloadDecision();
+ var unloads = expected.getUnloads();
+ unloads.put("broker5",
+ new Unload("broker5", "bundleE-1", Optional.of("broker1")));
+ unloads.put("broker4",
+ new Unload("broker4", "bundleD-2", Optional.of("broker2")));
+ expected.setLabel(Success);
+ expected.setReason(Overloaded);
+ expected.setLoadAvg(setupLoadAvg);
+ expected.setLoadStd(setupLoadStd);
+ assertEquals(res, expected);
+ }
+
+ @Test
+ public void testRandomLoad() throws IllegalAccessException {
+ TransferShedder transferShedder = new TransferShedder();
+ for (int i = 0; i < 5; i++) {
+ var ctx = setupContext(10);
+ var conf = ctx.brokerConfiguration();
+ transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+ var stats = (TransferShedder.LoadStats)
+ FieldUtils.readDeclaredField(transferShedder, "stats", true);
+ assertTrue(stats.std() <= conf.getLoadBalancerBrokerLoadTargetStd()
+ || (!stats.hasTransferableBrokers()));
+ }
+ }
+
+ @Test
+ public void testLoadStats() {
+ int numBrokers = 10;
+ double delta = 0.0001;
+ for (int t = 0; t < 5; t++) {
+ var ctx = setupContext(numBrokers);
+ TransferShedder.LoadStats stats = new TransferShedder.LoadStats();
+ var loadStore = ctx.brokerLoadDataStore();
+ stats.setLoadDataStore(loadStore);
+ var conf = ctx.brokerConfiguration();
+ stats.update(loadStore, Map.of(), conf);
+ double[] loads = new double[numBrokers];
+ var brokerLoadDataStore = ctx.brokerLoadDataStore();
+ for (int i = 0; i < loads.length; i++) {
+ loads[i] = loadStore.get("broker" + i).get().getWeightedMaxEMA();
+ }
+ int i = 0;
+ int j = loads.length - 1;
+ Arrays.sort(loads);
+ for (int k = 0; k < conf.getLoadBalancerMaxNumberOfBrokerTransfersPerCycle(); k++) {
+ double minLoad = loads[i];
+ double maxLoad = loads[j];
+ double offload = (maxLoad - minLoad) / 2;
+ Mean mean = new Mean();
+ StandardDeviation std = new StandardDeviation(false);
+ assertEquals(minLoad,
+ loadStore.get(stats.minBrokers().pollLast()).get().getWeightedMaxEMA());
+ assertEquals(maxLoad,
+ loadStore.get(stats.maxBrokers().pollLast()).get().getWeightedMaxEMA());
+ assertEquals(stats.totalBrokers(), numBrokers);
+ assertEquals(stats.avg(), mean.evaluate(loads), delta);
+ assertEquals(stats.std(), std.evaluate(loads), delta);
+ stats.offload(maxLoad, minLoad, offload);
+ loads[i++] = minLoad + offload;
+ loads[j--] = maxLoad - offload;
+ }
+ }
+ }
+}