You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/22 10:31:41 UTC

[GitHub] [pulsar] 315157973 opened a new pull request, #17456: [feat][broker] Support lower boundary shedding for ThresholdShedder

315157973 opened a new pull request, #17456:
URL: https://github.com/apache/pulsar/pull/17456

   ### Motivation
   The existing ThresholdShedder has the following problems, for example:
   There are 11 Brokers, of which 10 are loaded at 80% and 1 is loaded at 0%.
   The average load is 80 * 10 / 11 = 72.73, and the threshold to unload is 72.73 + 10 = 82.73.
   Since 80 < 82.73, unload will not be trigger, and there is one idle Broker with load of 0%.
   
   On the basis of ThresholdShedder, we adds the lower boundary judgment of the load.
   When 【current usage < average usage - threshold】, the broker with the highest load will be triggered to unload
   
   ### Modifications
   Support lower boundary shedding for ThresholdShedder
   
   ### Verifying this change
   1. Does not affect the original ThresholdShedder
   2. The new unit test can pass
   
   ### Documentation
   
   - [ x ] `doc-required` 
   (Your PR needs to update docs and you will update later)
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #17456: [feat][broker] Support lower boundary shedding for ThresholdShedder

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r976102889


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,60 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null) {
+            log.info("Load data is null or bundle <=1, skipping bundle unload.");
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+        double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+        if (minThroughputThreshold > minimumThroughputToOffload) {
+            log.info("broker {} in lower boundary shedding is planning to shed throughput {} MByte/s less than "
+                            + "minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
+                    maxUsageBroker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
+            return;
+        }
+        filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), maxUsageBroker, localData,
+                minimumThroughputToOffload);
+    }
+
+    private Pair<Boolean, String> getMaxUsageBroker(
+            LoadData loadData, double threshold, double avgUsage) {
+        String maxUsageBrokerName = "";
+        double maxUsage = avgUsage - threshold;

Review Comment:
   It does not solve the problem.
   Unless we know which `Bundle` is to be unloaded and the load of the `Bundle`, we can determine whether to select the current `Broker`. Currently, this Class is only responsible for selecting brokers and cannot know the information about bundles to be unloaded.
   
   The best way to solve this problem is to split bundle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r970178874


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+        double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+        if (minThroughputThreshold > minimumThroughputToOffload) {
+            log.info("broker {} in RangeThresholdShedder is planning to shed throughput {} MByte/s less than "
+                            + "minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
+                    maxUsageBroker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
+            return;
+        }
+        filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), maxUsageBroker, localData,
+                minimumThroughputToOffload);
+    }
+
+    private Pair<Boolean, String> getMaxUsageBroker(
+            LoadData loadData, double threshold, double avgUsage) {
+        String maxUsageBrokerName = "";
+        double maxUsage = -1;
+        boolean hasBrokerBelowLowerBound = false;
+        for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
+            String broker = entry.getKey();
+            double currentUsage = brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+            // Select the broker with the most resource usage.
+            if (currentUsage > maxUsage) {

Review Comment:
   I see.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on pull request #17456: [feat][broker] Support lower boundary shedding for ThresholdShedder

Posted by GitBox <gi...@apache.org>.
315157973 commented on PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#issuecomment-1330763323

   > [momo-jun](/momo-jun)
   
   Please help review this PR https://github.com/apache/pulsar/pull/18675


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r969740541


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+        double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+        if (minThroughputThreshold > minimumThroughputToOffload) {
+            log.info("broker {} in RangeThresholdShedder is planning to shed throughput {} MByte/s less than "
+                            + "minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
+                    maxUsageBroker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
+            return;
+        }
+        filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), maxUsageBroker, localData,
+                minimumThroughputToOffload);
+    }
+
+    private Pair<Boolean, String> getMaxUsageBroker(
+            LoadData loadData, double threshold, double avgUsage) {
+        String maxUsageBrokerName = "";
+        double maxUsage = -1;
+        boolean hasBrokerBelowLowerBound = false;
+        for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
+            String broker = entry.getKey();
+            double currentUsage = brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+            // Select the broker with the most resource usage.
+            if (currentUsage > maxUsage) {

Review Comment:
   Only the broker load > (avgUsage + threshold) can be treated as max usage broker?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;

Review Comment:
   Why do we need `LOWER_BOUNDARY_THRESHOLD_MARGIN`? I thought we only offload the load from the broker with load > (avg + threshold), no?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+        double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+        if (minThroughputThreshold > minimumThroughputToOffload) {
+            log.info("broker {} in RangeThresholdShedder is planning to shed throughput {} MByte/s less than "

Review Comment:
   We don't have `RangeThresholdShedder`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);

Review Comment:
   We will get an empty string from getMaxUsageBroker(). the log will become confusing.



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -2069,6 +2069,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private String loadBalancerLoadSheddingStrategy = "org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder";
 
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "When [current usage < average usage - threshold], "
+                    + "the broker with the highest load will be triggered to unload"
+    )
+    private boolean enableLowerBoundaryShedding = false;

Review Comment:
   ```suggestion
       private boolean lowerBoundarySheddingEnabled = false;
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;

Review Comment:
   Looks like we don't need to calculate the threshold for each check.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
+            return;
+        }

Review Comment:
   How can we go to the next most used broker? Look like we have a broker with 90% load, but it only has one bundle, we will skip the bundle offloading, but maybe there are other brokers with 80% 70% load.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();

Review Comment:
   It's better to move to line 247?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r969809607


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;

Review Comment:
    For example, the min load is 0.49, the avg load is 0.6, the max load is 0.62, and the threshold is 10%. If we directly offload 10% of the max load broker's throughput, it may lead to the max load broker's load becoming 0.52, which becomes the min load broker. It will cause offloading bundle with high frequency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r962631731


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/RangeThresholdShedder.java:
##########
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import com.google.common.collect.Multimap;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+
+/**
+ * On the basis of ThresholdShedder, RangeThresholdShedder adds the lower boundary judgment of the load.
+ * When 【current usage < average usage - threshold】, the broker with the highest load will be triggered to unload,
+ * avoiding the following scenarios:
+ * There are 11 Brokers, of which 10 are loaded at 80% and 1 is loaded at 0%.
+ * The average load is 80 * 10 / 11 = 72.73, and the threshold to unload is 72.73 + 10 = 82.73.
+ * Since 80 < 82.73, unload will not be trigger, and there is one idle Broker with load of 0%.
+ */
+@Slf4j
+public class RangeThresholdShedder extends ThresholdShedder {
+
+    @Override
+    public Multimap<String, String> findBundlesForUnloading(LoadData loadData, ServiceConfiguration conf) {
+        super.findBundlesForUnloading(loadData, conf);
+        // Return if the bundle to unload has already been selected.
+        if (!selectedBundlesCache.isEmpty()) {
+            return selectedBundlesCache;
+        }
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, super.canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
+            return selectedBundlesCache;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return selectedBundlesCache;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double minimumThroughputToOffload = getMinimumThroughputToOffload(threshold, localData);
+        final double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+        if (minThroughputThreshold > minimumThroughputToOffload) {
+            log.info("broker {} in RangeThresholdShedder is planning to shed throughput {} MByte/s less than "
+                            + "minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
+                    maxUsageBroker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
+            return selectedBundlesCache;
+        }
+        super.filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), maxUsageBroker, localData,
+                minimumThroughputToOffload);
+        return selectedBundlesCache;
+    }
+
+    private Pair<Boolean, String> getMaxUsageBroker(
+            LoadData loadData, double threshold, double avgUsage) {
+        String maxUsageBrokerName = "";
+        double maxUsage = -1;
+        boolean hasBrokerBelowLowerBound = false;
+        for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
+            String broker = entry.getKey();
+            double currentUsage = brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+            // Select the broker with the most resource usage.
+            if (currentUsage > maxUsage) {
+                maxUsage = currentUsage;
+                maxUsageBrokerName = broker;
+            }
+            // Whether any brokers with low usage in the cluster.
+            if (currentUsage < avgUsage - threshold) {
+                hasBrokerBelowLowerBound = true;
+            }
+        }
+        return Pair.of(hasBrokerBelowLowerBound, maxUsageBrokerName);
+    }
+
+    private double getMinimumThroughputToOffload(double threshold, LocalBrokerData localData) {
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
+        return brokerCurrentThroughput * threshold;

Review Comment:
   I suggest offloading the `0.5 * threshold` percent of brokerCurrentThroughput. For example, the min load is 0.49, the avg load is 0.6, the max load is 0.62, and the threshold is 10%. If we directly offload 10% of the max load broker's throughput, it may lead to the max load broker's load becoming 0.52, which becomes the min load broker. It will cause offloading bundle with high frequency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r969832962


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
+            return;
+        }

Review Comment:
   Good catch. 
   Modified to: In `getMaxUsageBroker`, only brokers with bundle number > 1 will be selected



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r971007824


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,60 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null) {
+            log.info("Load data is null or bundle <=1, skipping bundle unload.");
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+        double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+        if (minThroughputThreshold > minimumThroughputToOffload) {
+            log.info("broker {} in lower boundary shedding is planning to shed throughput {} MByte/s less than "
+                            + "minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
+                    maxUsageBroker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
+            return;
+        }
+        filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), maxUsageBroker, localData,
+                minimumThroughputToOffload);
+    }
+
+    private Pair<Boolean, String> getMaxUsageBroker(
+            LoadData loadData, double threshold, double avgUsage) {
+        String maxUsageBrokerName = "";
+        double maxUsage = -1;
+        boolean hasBrokerBelowLowerBound = false;
+        for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
+            String broker = entry.getKey();
+            BrokerData brokerData = entry.getValue();
+            double currentUsage = brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+            // Select the broker with the most resource usage.
+            if (currentUsage > maxUsage && brokerData.getLocalData() != null
+                    && brokerData.getLocalData().getBundles().size() > 1) {
+                maxUsage = currentUsage;
+                maxUsageBrokerName = broker;

Review Comment:
   Depends on whether the broker with maximum resource usage is in range [avgUse-avgUsage, avgUse+avgUsage]



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui closed pull request #17456: [feat][broker] Support lower boundary shedding for ThresholdShedder

Posted by GitBox <gi...@apache.org>.
codelipenghui closed pull request #17456: [feat][broker] Support lower boundary shedding for ThresholdShedder
URL: https://github.com/apache/pulsar/pull/17456


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r965156170


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -2069,6 +2069,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private String loadBalancerLoadSheddingStrategy = "org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder";
 
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "When 【current usage < average usage - threshold】, "
+                    + "the broker with the highest load will be triggered to unload"
+    )
+    private boolean enableLowerBoundaryShedding = false;

Review Comment:
   The value for `LOWER_BOUNDARY_THRESHOLD_MARGIN` is hard coded to 0.5. Instead of enabling this lower bound with a boolean flag, I am proposing that we make the lower boundary configurable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r970179455


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;

Review Comment:
   Thanks, I understand this part for now.
   
   I thought we will only offload the load from the broker which with load > avg + threshold
   https://github.com/apache/pulsar/pull/17456#discussion_r969814799
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r969814799


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+        double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+        if (minThroughputThreshold > minimumThroughputToOffload) {
+            log.info("broker {} in RangeThresholdShedder is planning to shed throughput {} MByte/s less than "
+                            + "minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
+                    maxUsageBroker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
+            return;
+        }
+        filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), maxUsageBroker, localData,
+                minimumThroughputToOffload);
+    }
+
+    private Pair<Boolean, String> getMaxUsageBroker(
+            LoadData loadData, double threshold, double avgUsage) {
+        String maxUsageBrokerName = "";
+        double maxUsage = -1;
+        boolean hasBrokerBelowLowerBound = false;
+        for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
+            String broker = entry.getKey();
+            double currentUsage = brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+            // Select the broker with the most resource usage.
+            if (currentUsage > maxUsage) {

Review Comment:
   I set the initial value of `maxUsage` to `avgUsage + threshold`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
lhotari commented on PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#issuecomment-1241466776

   @315157973 you must rebase (or use other ways to get latest master changes) to make this PR pass the new required checks. Please see the mailing list announcement https://lists.apache.org/thread/lv358906mnroq7tkk72xtsxfyjpbyvor.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] momo-jun commented on pull request #17456: [feat][broker] Support lower boundary shedding for ThresholdShedder

Posted by GitBox <gi...@apache.org>.
momo-jun commented on PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#issuecomment-1330174916

   > > @315157973 It will help users understand this feature if you can add the related docs. Do you have any planned updates on that?
   > 
   > I'll submit the docs soon.
   
   @315157973 any updates on the docs?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r969814799


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+        double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+        if (minThroughputThreshold > minimumThroughputToOffload) {
+            log.info("broker {} in RangeThresholdShedder is planning to shed throughput {} MByte/s less than "
+                            + "minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
+                    maxUsageBroker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
+            return;
+        }
+        filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), maxUsageBroker, localData,
+                minimumThroughputToOffload);
+    }
+
+    private Pair<Boolean, String> getMaxUsageBroker(
+            LoadData loadData, double threshold, double avgUsage) {
+        String maxUsageBrokerName = "";
+        double maxUsage = -1;
+        boolean hasBrokerBelowLowerBound = false;
+        for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
+            String broker = entry.getKey();
+            double currentUsage = brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+            // Select the broker with the most resource usage.
+            if (currentUsage > maxUsage) {

Review Comment:
   If load > (avgUsage + threshold), then the upper boundary can already be unloaded, and there is no need to unload according to the lower boundary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#issuecomment-1249209057

   @315157973 Please also update the PR title, it should be an improvement for the threshold shedder


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r970183644


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,60 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null) {
+            log.info("Load data is null or bundle <=1, skipping bundle unload.");
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+        double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+        if (minThroughputThreshold > minimumThroughputToOffload) {
+            log.info("broker {} in lower boundary shedding is planning to shed throughput {} MByte/s less than "
+                            + "minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
+                    maxUsageBroker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
+            return;
+        }
+        filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), maxUsageBroker, localData,
+                minimumThroughputToOffload);
+    }
+
+    private Pair<Boolean, String> getMaxUsageBroker(
+            LoadData loadData, double threshold, double avgUsage) {
+        String maxUsageBrokerName = "";
+        double maxUsage = -1;
+        boolean hasBrokerBelowLowerBound = false;
+        for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
+            String broker = entry.getKey();
+            BrokerData brokerData = entry.getValue();
+            double currentUsage = brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+            // Select the broker with the most resource usage.
+            if (currentUsage > maxUsage && brokerData.getLocalData() != null
+                    && brokerData.getLocalData().getBundles().size() > 1) {
+                maxUsage = currentUsage;
+                maxUsageBrokerName = broker;
+            }
+            // Whether any brokers with low usage in the cluster.
+            if (currentUsage < avgUsage - threshold) {
+                hasBrokerBelowLowerBound = true;

Review Comment:
   Do we need to check if the max usage broker is the same as the broker below the lower bound?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,60 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null) {
+            log.info("Load data is null or bundle <=1, skipping bundle unload.");
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+        double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+        if (minThroughputThreshold > minimumThroughputToOffload) {
+            log.info("broker {} in lower boundary shedding is planning to shed throughput {} MByte/s less than "
+                            + "minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
+                    maxUsageBroker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
+            return;
+        }
+        filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), maxUsageBroker, localData,
+                minimumThroughputToOffload);
+    }
+
+    private Pair<Boolean, String> getMaxUsageBroker(
+            LoadData loadData, double threshold, double avgUsage) {
+        String maxUsageBrokerName = "";
+        double maxUsage = -1;
+        boolean hasBrokerBelowLowerBound = false;
+        for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
+            String broker = entry.getKey();
+            BrokerData brokerData = entry.getValue();
+            double currentUsage = brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+            // Select the broker with the most resource usage.
+            if (currentUsage > maxUsage && brokerData.getLocalData() != null
+                    && brokerData.getLocalData().getBundles().size() > 1) {
+                maxUsage = currentUsage;
+                maxUsageBrokerName = broker;

Review Comment:
   The max usage broker load can be 0.3 and the lowest load broker can be 0.25
   In this case, we don't need to unload the load from the max usage broker?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on pull request #17456: [feat][broker] Support lower boundary shedding for ThresholdShedder

Posted by GitBox <gi...@apache.org>.
315157973 commented on PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#issuecomment-1286494795

   > @315157973 It will help users understand this feature if you can add the related docs. Do you have any planned updates on that?
   
   I'll submit the docs soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r964409454


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -2069,6 +2069,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private String loadBalancerLoadSheddingStrategy = "org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder";
 
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "When 【current usage < average usage - threshold】, "
+                    + "the broker with the highest load will be triggered to unload"
+    )
+    private boolean enableLowerBoundaryShedding = false;

Review Comment:
   What if we made the lower boundary a double between 0 and 1 and have it default to 0? Then, the lower bound is configurable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17456: Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#issuecomment-1236500012

   As the description of the issue, this appears to be an obvious flaw of the current `ThresholdShedder`. @315157973 Do you see any negative effects if we push the improvement to `ThresholdShedder` directly?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #17456: Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r962457723


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/RangeThresholdShedder.java:
##########
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import com.google.common.collect.Multimap;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+
+/**
+ * On the basis of ThresholdShedder, RangeThresholdShedder adds the lower boundary judgment of the load.
+ * When 【current usage < average usage - threshold】, the broker with the highest load will be triggered to unload,
+ * avoiding the following scenarios:
+ * There are 11 Brokers, of which 10 are loaded at 80% and 1 is loaded at 0%.
+ * The average load is 80 * 10 / 11 = 72.73, and the threshold to unload is 72.73 + 10 = 82.73.
+ * Since 80 < 82.73, unload will not be trigger, and there is one idle Broker with load of 0%.
+ */
+@Slf4j
+public class RangeThresholdShedder extends ThresholdShedder {

Review Comment:
   This seems to be an nice improvement to `ThresholdShedder`. Should we just name it like `ThresholdShedderV2`? So that once it is considered stable, we can deprecate the old `ThresholdShedder`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r969827851


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null || brokerData.getLocalData() == null
+                || brokerData.getLocalData().getBundles().size() <= 1) {
+            log.info("Load data is null or bundle <=1, broker name is {}, skipping bundle unload.", maxUsageBroker);
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();

Review Comment:
   Why? they look unrelated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r969839341


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,59 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;

Review Comment:
   Every time the method is called, it will be calculated at most once.
   Its value depends on the parameters of the method, so it cannot be specified in the constructor, so it needs to be calculated every time



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r971005712


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,60 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null) {
+            log.info("Load data is null or bundle <=1, skipping bundle unload.");
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+        double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+        if (minThroughputThreshold > minimumThroughputToOffload) {
+            log.info("broker {} in lower boundary shedding is planning to shed throughput {} MByte/s less than "
+                            + "minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
+                    maxUsageBroker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
+            return;
+        }
+        filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), maxUsageBroker, localData,
+                minimumThroughputToOffload);
+    }
+
+    private Pair<Boolean, String> getMaxUsageBroker(
+            LoadData loadData, double threshold, double avgUsage) {
+        String maxUsageBrokerName = "";
+        double maxUsage = -1;
+        boolean hasBrokerBelowLowerBound = false;
+        for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
+            String broker = entry.getKey();
+            BrokerData brokerData = entry.getValue();
+            double currentUsage = brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+            // Select the broker with the most resource usage.
+            if (currentUsage > maxUsage && brokerData.getLocalData() != null
+                    && brokerData.getLocalData().getBundles().size() > 1) {
+                maxUsage = currentUsage;
+                maxUsageBrokerName = broker;
+            }
+            // Whether any brokers with low usage in the cluster.
+            if (currentUsage < avgUsage - threshold) {
+                hasBrokerBelowLowerBound = true;

Review Comment:
   The initial value of `maxUsage` has been set to `avgUsage - threshold`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r972056579


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java:
##########
@@ -224,4 +225,114 @@ public void testPrintResourceUsage() {
         assertEquals(data.printResourceUsage(),
             "cpu: 10.00%, memory: 50.00%, directMemory: 90.00%, bandwidthIn: 30.00%, bandwidthOut: 20.00%");
     }
+
+    @Test
+    public void testRangeThroughput() {
+        int numBundles = 10;
+        int brokerNum = 11;
+        int lowLoadNode = 10;
+        LoadData loadData = new LoadData();
+        double throughput = 100 * 1024 * 1024;
+        //There are 11 Brokers, of which 10 are loaded at 80% and 1 is loaded at 0%.
+        //At this time, the average load is 80*10/11 = 72.73, and the threshold for rebalancing is 72.73 + 10 = 82.73.
+        //Since 80 < 82.73, rebalancing will not be trigger, and there is one Broker with load of 0.
+        for (int i = 0; i < brokerNum; i++) {
+            LocalBrokerData broker = new LocalBrokerData();
+            for (int j = 0; j < numBundles; j++) {
+                broker.getBundles().add("bundle-" + j);
+                BundleData bundle = new BundleData();
+                TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData();
+                timeAverageMessageData.setMsgThroughputIn(i == lowLoadNode ? 0 : throughput);
+                timeAverageMessageData.setMsgThroughputOut(i == lowLoadNode ? 0 : throughput);
+                bundle.setShortTermData(timeAverageMessageData);
+                String broker2BundleName = "broker-" + i + "-bundle-" + j;
+                loadData.getBundleData().put(broker2BundleName, bundle);
+                broker.getBundles().add(broker2BundleName);
+            }
+            broker.setBandwidthIn(new ResourceUsage(i == lowLoadNode ? 0 : 80, 100));
+            broker.setBandwidthOut(new ResourceUsage(i == lowLoadNode ? 0 : 80, 100));
+            broker.setMsgThroughputIn(i == lowLoadNode ? 0 : throughput);
+            broker.setMsgThroughputOut(i == lowLoadNode ? 0 : throughput);
+            loadData.getBrokerData().put("broker-" + i, new BrokerData(broker));
+        }
+        ThresholdShedder shedder = new ThresholdShedder();
+        Multimap<String, String> bundlesToUnload = shedder.findBundlesForUnloading(loadData, conf);
+        assertTrue(bundlesToUnload.isEmpty());
+        conf.setLowerBoundarySheddingEnabled(true);
+        bundlesToUnload = thresholdShedder.findBundlesForUnloading(loadData, conf);
+        assertFalse(bundlesToUnload.isEmpty());
+    }
+
+    @Test
+    public void testNoBrokerToOffload() {
+        int numBundles = 10;
+        int brokerNum = 11;
+        LoadData loadData = new LoadData();
+        double throughput = 80 * 1024 * 1024;
+        //Load of all Brokers are 80%, and no Broker needs to offload.
+        for (int i = 0; i < brokerNum; i++) {
+            LocalBrokerData broker = new LocalBrokerData();
+            for (int j = 0; j < numBundles; j++) {
+                broker.getBundles().add("bundle-" + j);
+                BundleData bundle = new BundleData();
+                TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData();
+                timeAverageMessageData.setMsgThroughputIn(throughput);
+                timeAverageMessageData.setMsgThroughputOut(throughput);
+                bundle.setShortTermData(timeAverageMessageData);
+                String broker2BundleName = "broker-" + i + "-bundle-" + j;
+                loadData.getBundleData().put(broker2BundleName, bundle);
+                broker.getBundles().add(broker2BundleName);
+            }
+            broker.setBandwidthIn(new ResourceUsage(80, 100));
+            broker.setBandwidthOut(new ResourceUsage(80, 100));
+            broker.setMsgThroughputIn(throughput);
+            broker.setMsgThroughputOut(throughput);
+            loadData.getBrokerData().put("broker-" + i, new BrokerData(broker));
+        }
+        ThresholdShedder shedder = new ThresholdShedder();
+        Multimap<String, String> bundlesToUnload = shedder.findBundlesForUnloading(loadData, conf);
+        assertTrue(bundlesToUnload.isEmpty());
+        conf.setLowerBoundarySheddingEnabled(true);
+        bundlesToUnload = thresholdShedder.findBundlesForUnloading(loadData, conf);
+        assertTrue(bundlesToUnload.isEmpty());
+    }
+
+    @Test
+    public void testBrokerWithOneBundle() {

Review Comment:
   ```suggestion
       public void testLowerBoundarySheddingBrokerWithOneBundle() {
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java:
##########
@@ -224,4 +225,114 @@ public void testPrintResourceUsage() {
         assertEquals(data.printResourceUsage(),
             "cpu: 10.00%, memory: 50.00%, directMemory: 90.00%, bandwidthIn: 30.00%, bandwidthOut: 20.00%");
     }
+
+    @Test
+    public void testRangeThroughput() {
+        int numBundles = 10;
+        int brokerNum = 11;
+        int lowLoadNode = 10;
+        LoadData loadData = new LoadData();
+        double throughput = 100 * 1024 * 1024;
+        //There are 11 Brokers, of which 10 are loaded at 80% and 1 is loaded at 0%.
+        //At this time, the average load is 80*10/11 = 72.73, and the threshold for rebalancing is 72.73 + 10 = 82.73.
+        //Since 80 < 82.73, rebalancing will not be trigger, and there is one Broker with load of 0.
+        for (int i = 0; i < brokerNum; i++) {
+            LocalBrokerData broker = new LocalBrokerData();
+            for (int j = 0; j < numBundles; j++) {
+                broker.getBundles().add("bundle-" + j);
+                BundleData bundle = new BundleData();
+                TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData();
+                timeAverageMessageData.setMsgThroughputIn(i == lowLoadNode ? 0 : throughput);
+                timeAverageMessageData.setMsgThroughputOut(i == lowLoadNode ? 0 : throughput);
+                bundle.setShortTermData(timeAverageMessageData);
+                String broker2BundleName = "broker-" + i + "-bundle-" + j;
+                loadData.getBundleData().put(broker2BundleName, bundle);
+                broker.getBundles().add(broker2BundleName);
+            }
+            broker.setBandwidthIn(new ResourceUsage(i == lowLoadNode ? 0 : 80, 100));
+            broker.setBandwidthOut(new ResourceUsage(i == lowLoadNode ? 0 : 80, 100));
+            broker.setMsgThroughputIn(i == lowLoadNode ? 0 : throughput);
+            broker.setMsgThroughputOut(i == lowLoadNode ? 0 : throughput);
+            loadData.getBrokerData().put("broker-" + i, new BrokerData(broker));
+        }
+        ThresholdShedder shedder = new ThresholdShedder();
+        Multimap<String, String> bundlesToUnload = shedder.findBundlesForUnloading(loadData, conf);
+        assertTrue(bundlesToUnload.isEmpty());
+        conf.setLowerBoundarySheddingEnabled(true);
+        bundlesToUnload = thresholdShedder.findBundlesForUnloading(loadData, conf);
+        assertFalse(bundlesToUnload.isEmpty());
+    }
+
+    @Test
+    public void testNoBrokerToOffload() {

Review Comment:
   ```suggestion
       public void testLowerBoundarySheddingNoBrokerToOffload() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on pull request #17456: Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
315157973 commented on PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#issuecomment-1236340331

   > 
   
   We are not sure that the new way completely fine, ThresholdShedder is now the default algorithm, if there are some problems, it will affect the existing business, so we'd better run separately.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r964986961


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -2069,6 +2069,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private String loadBalancerLoadSheddingStrategy = "org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder";
 
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "When 【current usage < average usage - threshold】, "
+                    + "the broker with the highest load will be triggered to unload"
+    )
+    private boolean enableLowerBoundaryShedding = false;

Review Comment:
   What does it mean to set a value between 0 and 1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r964408741


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -2069,6 +2069,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private String loadBalancerLoadSheddingStrategy = "org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder";
 
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "When 【current usage < average usage - threshold】, "

Review Comment:
   Nit:
   
   ```suggestion
               doc = "When [current usage < average usage - threshold], "
   ```



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -2069,6 +2069,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private String loadBalancerLoadSheddingStrategy = "org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder";
 
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "When 【current usage < average usage - threshold】, "
+                    + "the broker with the highest load will be triggered to unload"
+    )
+    private boolean enableLowerBoundaryShedding = false;

Review Comment:
   What if we made the lower boundary a double between 0 and 1 and have it default to 0? Then, the threshold is configurable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on pull request #17456: Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
315157973 commented on PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#issuecomment-1236501627

   > As the description of the issue, this appears to be an obvious flaw of the current `ThresholdShedder`. @315157973 Do you see any negative effects if we push the improvement to `ThresholdShedder` directly?
   
   As I said above, there is no way to guarantee that this new feature will be fully stable, and if something goes wrong, we can't even turn it off.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
315157973 commented on PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#issuecomment-1238135168

   > Choosing between this implementation and the original one is something that nobody would ever do in production.
   
   Please help CR, I will verify it in production environment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
315157973 commented on PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#issuecomment-1237579126

   @eolivelli  PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 merged pull request #17456: [feat][broker] Support lower boundary shedding for ThresholdShedder

Posted by GitBox <gi...@apache.org>.
315157973 merged PR #17456:
URL: https://github.com/apache/pulsar/pull/17456


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] momo-jun commented on pull request #17456: [feat][broker] Support lower boundary shedding for ThresholdShedder

Posted by GitBox <gi...@apache.org>.
momo-jun commented on PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#issuecomment-1286436590

   @315157973 It will help users understand this feature if you can add the related docs. Do you have any planned updates on that?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r965798851


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -2069,6 +2069,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private String loadBalancerLoadSheddingStrategy = "org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder";
 
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "When 【current usage < average usage - threshold】, "
+                    + "the broker with the highest load will be triggered to unload"
+    )
+    private boolean enableLowerBoundaryShedding = false;

Review Comment:
   `LoadBalancerBrokerThresholdShedderPercentage` already allows users to control the percentage
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17456: [feat][broker] Add a new range threshold shedder

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r972048793


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java:
##########
@@ -224,4 +225,114 @@ public void testPrintResourceUsage() {
         assertEquals(data.printResourceUsage(),
             "cpu: 10.00%, memory: 50.00%, directMemory: 90.00%, bandwidthIn: 30.00%, bandwidthOut: 20.00%");
     }
+
+    @Test
+    public void testRangeThroughput() {

Review Comment:
   ```suggestion
       public void testLowerBoundaryShedding() {
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,60 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null) {
+            log.info("Load data is null or bundle <=1, skipping bundle unload.");
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+        double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+        if (minThroughputThreshold > minimumThroughputToOffload) {
+            log.info("broker {} in lower boundary shedding is planning to shed throughput {} MByte/s less than "
+                            + "minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
+                    maxUsageBroker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
+            return;
+        }
+        filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), maxUsageBroker, localData,
+                minimumThroughputToOffload);
+    }
+
+    private Pair<Boolean, String> getMaxUsageBroker(
+            LoadData loadData, double threshold, double avgUsage) {
+        String maxUsageBrokerName = "";
+        double maxUsage = avgUsage - threshold;

Review Comment:
   The max usage broker load: > (avgUsage - threshold)
   The below lower broker load: < (avgUsage - threshold)
   
   After the max usage broker unloads to the lower broker, the max usage broker might become the lower broker, the lower broker becomes the max usage broker. Can this will lead to frequent bundle unloading?
   
   It looks like we need to change 
   
   ```java
   double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
   ```
   
   to 
   
   ```java
   double minimumThroughputToOffload = Math.min(brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN, brokerCurrentThroughput - avgUsage - threshold);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -227,4 +236,60 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
         return historyUsage;
     }
 
+    private void tryLowerBoundaryShedding(LoadData loadData, ServiceConfiguration conf) {
+        // Select the broker with the most resource usage.
+        final double threshold = conf.getLoadBalancerBrokerThresholdShedderPercentage() / 100.0;
+        final double avgUsage = getBrokerAvgUsage(loadData, conf, canSampleLog());
+        Pair<Boolean, String> result = getMaxUsageBroker(loadData, threshold, avgUsage);
+        boolean hasBrokerBelowLowerBound = result.getLeft();
+        String maxUsageBroker = result.getRight();
+        BrokerData brokerData = loadData.getBrokerData().get(maxUsageBroker);
+        if (brokerData == null) {
+            log.info("Load data is null or bundle <=1, skipping bundle unload.");
+            return;
+        }
+        if (!hasBrokerBelowLowerBound) {
+            log.info("No broker is below the lower bound, threshold is {}, "
+                            + "avgUsage usage is {}, max usage of Broker {} is {}",
+                    threshold, avgUsage, maxUsageBroker,
+                    brokerAvgResourceUsage.getOrDefault(maxUsageBroker, 0.0));
+            return;
+        }
+        LocalBrokerData localData = brokerData.getLocalData();
+        double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
+        double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN;
+        double minThroughputThreshold = conf.getLoadBalancerBundleUnloadMinThroughputThreshold() * MB;
+        if (minThroughputThreshold > minimumThroughputToOffload) {
+            log.info("broker {} in lower boundary shedding is planning to shed throughput {} MByte/s less than "
+                            + "minimumThroughputThreshold {} MByte/s, skipping bundle unload.",
+                    maxUsageBroker, minimumThroughputToOffload / MB, minThroughputThreshold / MB);
+            return;
+        }
+        filterAndSelectBundle(loadData, loadData.getRecentlyUnloadedBundles(), maxUsageBroker, localData,
+                minimumThroughputToOffload);
+    }
+
+    private Pair<Boolean, String> getMaxUsageBroker(
+            LoadData loadData, double threshold, double avgUsage) {
+        String maxUsageBrokerName = "";
+        double maxUsage = avgUsage - threshold;

Review Comment:
   And we should also add a test for this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org