You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/13 15:08:03 UTC

[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

codelipenghui commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r969740541


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+        double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+        if (minThroughputThreshold > minimumThroughputToOffload) {
+            log.info("broker {} in RangeThresholdShedder is planning to shed throughput {} MByte/s less than "
+                            + "minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
+                    maxUsageBroker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
+            return;
+        }
+        filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), maxUsageBroker, localData,
+                minimumThroughputToOffload);
+    }
+
+    private Pair<Boolean, String> getMaxUsageBroker(
+            LoadData loadData, double threshold, double avgUsage) {
+        String maxUsageBrokerName = "";
+        double maxUsage = -1;
+        boolean hasBrokerBelowLowerBound = false;
+        for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
+            String broker = entry.getKey();
+            double currentUsage = brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+            // Select the broker with the most resource usage.
+            if (currentUsage > maxUsage) {

Review Comment:
   Only the broker load > (avgUsage + threshold) can be treated as max usage broker?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;

Review Comment:
   Why do we need `LOWER_BOUNDARY_THRESHOLD_MARGIN`? I thought we only offload the load from the broker with load > (avg + threshold), no?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+        double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+        if (minThroughputThreshold > minimumThroughputToOffload) {
+            log.info("broker {} in RangeThresholdShedder is planning to shed throughput {} MByte/s less than "

Review Comment:
   We don't have `RangeThresholdShedder`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);

Review Comment:
   We will get an empty string from getMaxUsageBroker(). the log will become confusing.



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -2069,6 +2069,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private String loadBalancerLoadSheddingStrategy = "org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder";
 
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "When [current usage < average usage - threshold], "
+                    + "the broker with the highest load will be triggered to unload"
+    )
+    private boolean enableLowerBoundaryShedding = false;

Review Comment:
   ```suggestion
       private boolean lowerBoundarySheddingEnabled = false;
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;

Review Comment:
   Looks like we don't need to calculate the threshold for each check.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
+            return;
+        }

Review Comment:
   How can we go to the next most used broker? Look like we have a broker with 90% load, but it only has one bundle, we will skip the bundle offloading, but maybe there are other brokers with 80% 70% load.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();

Review Comment:
   It's better to move to line 247?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org