You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by kw...@apache.org on 2022/09/01 02:28:24 UTC

[pulsar] branch branch-2.9 updated: [branch-2.9][fix][load-balancer] skip mis-configured resource usage(>100%) in load computation (#16937) (#17285)

This is an automated email from the ASF dual-hosted git repository.

kwang pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new a9c90ef0686 [branch-2.9][fix][load-balancer] skip mis-configured resource usage(>100%) in load computation (#16937) (#17285)
a9c90ef0686 is described below

commit a9c90ef06867fd0842fbf3339b2c2495ba5e8ea6
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Wed Aug 31 19:28:16 2022 -0700

    [branch-2.9][fix][load-balancer] skip mis-configured resource usage(>100%) in load computation (#16937) (#17285)
    
    ### Motivation
    
    cherry-pick this commit to branch-2.9 requested by : https://github.com/apache/pulsar/pull/16937#issuecomment-1227036563
---
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 10 +--
 .../broker/loadbalance/impl/ThresholdShedder.java  | 74 +++++++++++++++++++---
 .../data/loadbalancer/LocalBrokerData.java         | 20 ++++++
 3 files changed, 92 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 67a1ecc6f6c..6191078569c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -427,8 +427,9 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
         long timeSinceLastReportWrittenToStore = System.currentTimeMillis() - localData.getLastUpdate();
         if (timeSinceLastReportWrittenToStore > updateMaxIntervalMillis) {
             log.info("Writing local data to metadata store because time since last"
-                            + " update exceeded threshold of {} minutes",
-                    conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
+                            + " update exceeded threshold of {} minutes. ResourceUsage:[{}]",
+                    conf.getLoadBalancerReportUpdateMaxIntervalMinutes(),
+                    localData.printResourceUsage());
             // Always update after surpassing the maximum interval.
             return true;
         }
@@ -442,9 +443,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
                                         percentChange(lastData.getNumBundles(), localData.getNumBundles()))));
         if (maxChange > conf.getLoadBalancerReportUpdateThresholdPercentage()) {
             log.info("Writing local data to metadata store because maximum change {}% exceeded threshold {}%; "
-                            + "time since last report written is {} seconds", maxChange,
+                            + "time since last report written is {} seconds. ResourceUsage:[{}]", maxChange,
                     conf.getLoadBalancerReportUpdateThresholdPercentage(),
-                    timeSinceLastReportWrittenToStore / 1000.0);
+                    timeSinceLastReportWrittenToStore / 1000.0,
+                    localData.printResourceUsage());
             return true;
         }
         return false;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
index afca7084617..44e61de0b97 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
@@ -52,16 +52,38 @@ public class ThresholdShedder implements LoadSheddingStrategy {
     private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
     private static final double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;
     private static final double MB = 1024 * 1024;
+
+    private static final long LOAD_LOG_SAMPLE_DELAY_IN_SEC = 5 * 60; // 5 mins
     private final Map<String, Double> brokerAvgResourceUsage = new HashMap<>();
+    private long lastSampledLoadLogTS = 0;
+
+
+    private static int toPercentage(double usage) {
+        return (int) (usage * 100);
+    }
+
+    private boolean canSampleLog() {
+        long now = System.currentTimeMillis() / 1000;
+        boolean sampleLog = now - lastSampledLoadLogTS >= LOAD_LOG_SAMPLE_DELAY_IN_SEC;
+        if (sampleLog) {
+            lastSampledLoadLogTS = now;
+        }
+        return sampleLog;
+    }
 
     @Override
     public Multimap<String, String> findBundlesForUnloading(final LoadData loadData, final ServiceConfiguration conf) {
         selectedBundlesCache.clear();
+        boolean sampleLog = canSampleLog();
         final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
         final Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
         final double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
 
-        final double avgUsage = getBrokerAvgUsage(loadData, conf.getLoadBalancerHistoryResourcePercentage(), conf);
+        final double avgUsage = getBrokerAvgUsage(
+                loadData, conf.getLoadBalancerHistoryResourcePercentage(), conf, sampleLog);
+        if (sampleLog) {
+            log.info("brokers' resource avgUsage:{}%", toPercentage(avgUsage));
+        }
 
         if (avgUsage == 0) {
             log.warn("average max resource usage is 0");
@@ -73,8 +95,9 @@ public class ThresholdShedder implements LoadSheddingStrategy {
             final double currentUsage = brokerAvgResourceUsage.getOrDefault(broker, 0.0);
 
             if (currentUsage < avgUsage + threshold) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] broker is not overloaded, ignoring at this point", broker);
+                if (sampleLog) {
+                    log.info("[{}] broker is not overloaded, ignoring at this point, currentUsage:{}%",
+                            broker, toPercentage(currentUsage));
                 }
                 return;
             }
@@ -85,14 +108,13 @@ public class ThresholdShedder implements LoadSheddingStrategy {
             double minimumThroughputToOffload = brokerCurrentThroughput * percentOfTrafficToOffload;
 
             if (minimumThroughputToOffload < minThroughputThreshold) {
-                if (log.isDebugEnabled()) {
+                if (sampleLog) {
                     log.info("[{}] broker is planning to shed throughput {} MByte/s less than "
                                     + "minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
                             broker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
                 }
                 return;
             }
-
             log.info(
                     "Attempting to shed load on {}, which has max resource usage above avgUsage  and threshold {}%"
                             + " > {}% + {}% -- Offloading at least {} MByte/s of traffic, left throughput {} MByte/s",
@@ -138,14 +160,14 @@ public class ThresholdShedder implements LoadSheddingStrategy {
     }
 
     private double getBrokerAvgUsage(final LoadData loadData, final double historyPercentage,
-                                     final ServiceConfiguration conf) {
+                                     final ServiceConfiguration conf, boolean sampleLog) {
         double totalUsage = 0.0;
         int totalBrokers = 0;
 
         for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
             LocalBrokerData localBrokerData = entry.getValue().getLocalData();
             String broker = entry.getKey();
-            totalUsage += updateAvgResourceUsage(broker, localBrokerData, historyPercentage, conf);
+            totalUsage += updateAvgResourceUsage(broker, localBrokerData, historyPercentage, conf, sampleLog);
             totalBrokers++;
         }
 
@@ -153,7 +175,8 @@ public class ThresholdShedder implements LoadSheddingStrategy {
     }
 
     private double updateAvgResourceUsage(String broker, LocalBrokerData localBrokerData,
-                                          final double historyPercentage, final ServiceConfiguration conf) {
+                                          final double historyPercentage, final ServiceConfiguration conf,
+                                          boolean sampleLog) {
         Double historyUsage =
                 brokerAvgResourceUsage.get(broker);
         double resourceUsage = localBrokerData.getMaxResourceUsageWithWeight(
@@ -161,6 +184,41 @@ public class ThresholdShedder implements LoadSheddingStrategy {
                 conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
                 conf.getLoadBalancerBandwithInResourceWeight(),
                 conf.getLoadBalancerBandwithOutResourceWeight());
+
+        if (sampleLog) {
+            log.info("{} broker load: historyUsage={}%, resourceUsage={}%",
+                    broker,
+                    historyUsage == null ? 0 : toPercentage(historyUsage),
+                    toPercentage(resourceUsage));
+        }
+
+        // wrap if resourceUsage is bigger than 1.0
+        if (resourceUsage > 1.0) {
+            log.error("{} broker resourceUsage is bigger than 100%. "
+                            + "Some of the resource limits are mis-configured. "
+                            + "Try to disable the error resource signals by setting their weights to zero "
+                            + "or fix the resource limit configurations. "
+                            + "Ref:https://pulsar.apache.org/docs/administration-load-balance/#thresholdshedder "
+                            + "ResourceUsage:[{}], "
+                            + "CPUResourceWeight:{}, MemoryResourceWeight:{}, DirectMemoryResourceWeight:{}, "
+                            + "BandwithInResourceWeight:{}, BandwithOutResourceWeight:{}",
+                    broker,
+                    localBrokerData.printResourceUsage(),
+                    conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(),
+                    conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+
+            resourceUsage = localBrokerData.getMaxResourceUsageWithWeightWithinLimit(
+                    conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+
+            log.warn("{} broker recomputed max resourceUsage={}%. Skipped usage signals bigger than 100%",
+                    broker, toPercentage(resourceUsage));
+        }
         historyUsage = historyUsage == null
                 ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index 595bbe9d0d2..a503a851715 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -255,6 +255,16 @@ public class LocalBrokerData implements LoadManagerReport {
                 bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
     }
 
+    public double getMaxResourceUsageWithWeightWithinLimit(final double cpuWeight, final double memoryWeight,
+                                                           final double directMemoryWeight,
+                                                           final double bandwidthInWeight,
+                                                           final double bandwidthOutWeight) {
+        return maxWithinLimit(100.0d,
+                cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
+                directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
+                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+    }
+
     private static double max(double... args) {
         double max = Double.NEGATIVE_INFINITY;
 
@@ -279,6 +289,16 @@ public class LocalBrokerData implements LoadManagerReport {
         return max;
     }
 
+    private static double maxWithinLimit(double limit, double...args) {
+        double max = 0.0;
+        for (double d : args) {
+            if (d > max && d <= limit) {
+                max = d;
+            }
+        }
+        return max;
+    }
+
     public String getLoadReportType() {
         return loadReportType;
     }