You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/04 12:37:06 UTC
[pulsar] branch branch-2.10 updated: [fix][load-balancer] skip mis-configured resource usage(>100%) in load computation (#16937)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 688fb76696b [fix][load-balancer] skip mis-configured resource usage(>100%) in load computation (#16937)
688fb76696b is described below
commit 688fb76696bb18dd65531953b872a1106b19f406
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Wed Aug 10 17:25:35 2022 -0700
[fix][load-balancer] skip mis-configured resource usage(>100%) in load computation (#16937)
(cherry picked from commit eb833e6ffed472a82e4b2ea5ce6271c59246e119)
---
.../loadbalance/impl/ModularLoadManagerImpl.java | 10 +--
.../broker/loadbalance/impl/ThresholdShedder.java | 76 +++++++++++++++++++---
.../loadbalance/impl/ThresholdShedderTest.java | 6 ++
.../data/loadbalancer/LocalBrokerData.java | 20 ++++++
4 files changed, 99 insertions(+), 13 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 65c11b76464..261548baa0d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -447,8 +447,9 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
long timeSinceLastReportWrittenToStore = System.currentTimeMillis() - localData.getLastUpdate();
if (timeSinceLastReportWrittenToStore > updateMaxIntervalMillis) {
log.info("Writing local data to metadata store because time since last"
- + " update exceeded threshold of {} minutes",
- conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
+ + " update exceeded threshold of {} minutes. ResourceUsage:[{}]",
+ conf.getLoadBalancerReportUpdateMaxIntervalMinutes(),
+ localData.printResourceUsage());
// Always update after surpassing the maximum interval.
return true;
}
@@ -462,9 +463,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
percentChange(lastData.getNumBundles(), localData.getNumBundles()))));
if (maxChange > conf.getLoadBalancerReportUpdateThresholdPercentage()) {
log.info("Writing local data to metadata store because maximum change {}% exceeded threshold {}%; "
- + "time since last report written is {} seconds", maxChange,
+ + "time since last report written is {} seconds. ResourceUsage:[{}]", maxChange,
conf.getLoadBalancerReportUpdateThresholdPercentage(),
- timeSinceLastReportWrittenToStore / 1000.0);
+ timeSinceLastReportWrittenToStore / 1000.0,
+ localData.printResourceUsage());
return true;
}
return false;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
index 62a29b9f22d..680022179c1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
@@ -52,16 +52,38 @@ public class ThresholdShedder implements LoadSheddingStrategy {
private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
private static final double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;
private static final double MB = 1024 * 1024;
+
+ private static final long LOAD_LOG_SAMPLE_DELAY_IN_SEC = 5 * 60; // 5 mins
private final Map<String, Double> brokerAvgResourceUsage = new HashMap<>();
+ private long lastSampledLoadLogTS = 0;
+
+
+ private static int toPercentage(double usage) {
+ return (int) (usage * 100);
+ }
+
+ private boolean canSampleLog() {
+ long now = System.currentTimeMillis() / 1000;
+ boolean sampleLog = now - lastSampledLoadLogTS >= LOAD_LOG_SAMPLE_DELAY_IN_SEC;
+ if (sampleLog) {
+ lastSampledLoadLogTS = now;
+ }
+ return sampleLog;
+ }
@Override
public Multimap<String, String> findBundlesForUnloading(final LoadData loadData, final ServiceConfiguration conf) {
selectedBundlesCache.clear();
+ boolean sampleLog = canSampleLog();
final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
final Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
final double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
- final double avgUsage = getBrokerAvgUsage(loadData, conf.getLoadBalancerHistoryResourcePercentage(), conf);
+ final double avgUsage = getBrokerAvgUsage(
+ loadData, conf.getLoadBalancerHistoryResourcePercentage(), conf, sampleLog);
+ if (sampleLog) {
+ log.info("brokers' resource avgUsage:{}%", toPercentage(avgUsage));
+ }
if (avgUsage == 0) {
log.warn("average max resource usage is 0");
@@ -73,8 +95,9 @@ public class ThresholdShedder implements LoadSheddingStrategy {
final double currentUsage = brokerAvgResourceUsage.getOrDefault(broker, 0.0);
if (currentUsage < avgUsage + threshold) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] broker is not overloaded, ignoring at this point", broker);
+ if (sampleLog) {
+ log.info("[{}] broker is not overloaded, ignoring at this point, currentUsage:{}%",
+ broker, toPercentage(currentUsage));
}
return;
}
@@ -85,14 +108,13 @@ public class ThresholdShedder implements LoadSheddingStrategy {
double minimumThroughputToOffload = brokerCurrentThroughput * percentOfTrafficToOffload;
if (minimumThroughputToOffload < minThroughputThreshold) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] broker is planning to shed throughput {} MByte/s less than "
+ if (sampleLog) {
+ log.info("[{}] broker is planning to shed throughput {} MByte/s less than "
+ "minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
broker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
}
return;
}
-
log.info(
"Attempting to shed load on {}, which has max resource usage above avgUsage and threshold {}%"
+ " > {}% + {}% -- Offloading at least {} MByte/s of traffic, left throughput {} MByte/s",
@@ -138,14 +160,14 @@ public class ThresholdShedder implements LoadSheddingStrategy {
}
private double getBrokerAvgUsage(final LoadData loadData, final double historyPercentage,
- final ServiceConfiguration conf) {
+ final ServiceConfiguration conf, boolean sampleLog) {
double totalUsage = 0.0;
int totalBrokers = 0;
for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
LocalBrokerData localBrokerData = entry.getValue().getLocalData();
String broker = entry.getKey();
- totalUsage += updateAvgResourceUsage(broker, localBrokerData, historyPercentage, conf);
+ totalUsage += updateAvgResourceUsage(broker, localBrokerData, historyPercentage, conf, sampleLog);
totalBrokers++;
}
@@ -153,7 +175,8 @@ public class ThresholdShedder implements LoadSheddingStrategy {
}
private double updateAvgResourceUsage(String broker, LocalBrokerData localBrokerData,
- final double historyPercentage, final ServiceConfiguration conf) {
+ final double historyPercentage, final ServiceConfiguration conf,
+ boolean sampleLog) {
Double historyUsage =
brokerAvgResourceUsage.get(broker);
double resourceUsage = localBrokerData.getMaxResourceUsageWithWeight(
@@ -161,6 +184,41 @@ public class ThresholdShedder implements LoadSheddingStrategy {
conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
conf.getLoadBalancerBandwithInResourceWeight(),
conf.getLoadBalancerBandwithOutResourceWeight());
+
+ if (sampleLog) {
+ log.info("{} broker load: historyUsage={}%, resourceUsage={}%",
+ broker,
+ historyUsage == null ? 0 : toPercentage(historyUsage),
+ toPercentage(resourceUsage));
+ }
+
+ // wrap if resourceUsage is bigger than 1.0
+ if (resourceUsage > 1.0) {
+ log.error("{} broker resourceUsage is bigger than 100%. "
+ + "Some of the resource limits are mis-configured. "
+ + "Try to disable the error resource signals by setting their weights to zero "
+ + "or fix the resource limit configurations. "
+ + "Ref:https://pulsar.apache.org/docs/administration-load-balance/#thresholdshedder "
+ + "ResourceUsage:[{}], "
+ + "CPUResourceWeight:{}, MemoryResourceWeight:{}, DirectMemoryResourceWeight:{}, "
+ + "BandwithInResourceWeight:{}, BandwithOutResourceWeight:{}",
+ broker,
+ localBrokerData.printResourceUsage(),
+ conf.getLoadBalancerCPUResourceWeight(),
+ conf.getLoadBalancerMemoryResourceWeight(),
+ conf.getLoadBalancerDirectMemoryResourceWeight(),
+ conf.getLoadBalancerBandwithInResourceWeight(),
+ conf.getLoadBalancerBandwithOutResourceWeight());
+
+ resourceUsage = localBrokerData.getMaxResourceUsageWithWeightWithinLimit(
+ conf.getLoadBalancerCPUResourceWeight(),
+ conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+ conf.getLoadBalancerBandwithInResourceWeight(),
+ conf.getLoadBalancerBandwithOutResourceWeight());
+
+ log.warn("{} broker recomputed max resourceUsage={}%. Skipped usage signals bigger than 100%",
+ broker, toPercentage(resourceUsage));
+ }
historyUsage = historyUsage == null
? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java
index 4f6ad7f8c98..eda4fdd859f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java
@@ -73,6 +73,9 @@ public class ThresholdShedderTest {
LoadData loadData = new LoadData();
LocalBrokerData broker1 = new LocalBrokerData();
+ broker1.setCpu(new ResourceUsage(1000, 100));
+ broker1.setMemory(new ResourceUsage(5000, 100));
+ broker1.setDirectMemory(new ResourceUsage(5000, 100));
broker1.setBandwidthIn(new ResourceUsage(500, 1000));
broker1.setBandwidthOut(new ResourceUsage(500, 1000));
broker1.setBundles(Sets.newHashSet("bundle-1"));
@@ -115,6 +118,9 @@ public class ThresholdShedderTest {
LoadData loadData = new LoadData();
LocalBrokerData broker1 = new LocalBrokerData();
+ broker1.setCpu(new ResourceUsage(1000, 100));
+ broker1.setMemory(new ResourceUsage(5000, 100));
+ broker1.setDirectMemory(new ResourceUsage(5000, 100));
broker1.setBandwidthIn(new ResourceUsage(999, 1000));
broker1.setBandwidthOut(new ResourceUsage(999, 1000));
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index e63de9b0dca..8736b675272 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -255,6 +255,16 @@ public class LocalBrokerData implements LoadManagerReport {
bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
}
+ public double getMaxResourceUsageWithWeightWithinLimit(final double cpuWeight, final double memoryWeight,
+ final double directMemoryWeight,
+ final double bandwidthInWeight,
+ final double bandwidthOutWeight) {
+ return maxWithinLimit(100.0d,
+ cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
+ directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
+ bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+ }
+
private static double max(double... args) {
double max = Double.NEGATIVE_INFINITY;
@@ -279,6 +289,16 @@ public class LocalBrokerData implements LoadManagerReport {
return max;
}
+ private static double maxWithinLimit(double limit, double...args) {
+ double max = 0.0;
+ for (double d : args) {
+ if (d > max && d <= limit) {
+ max = d;
+ }
+ }
+ return max;
+ }
+
public String getLoadReportType() {
return loadReportType;
}