You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/29 09:57:20 UTC

[GitHub] [pulsar] HQebupt opened a new pull request, #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

HQebupt opened a new pull request, #16281:
URL: https://github.com/apache/pulsar/pull/16281

   ### Motivation
   See PIP-181 #16274 
   
   ### Modifications
   See #16274 
   
   ### Verifying this change
   
   - [x]  Make sure that the change passes the CI checks.
   
   This change is a trivial rework / code cleanup without any test coverage.
   ### Does this pull request potentially affect one of the following parts:
   
   If `yes` was chosen, please highlight the changes
   
   - Dependencies (does it add or upgrade a dependency): (no)
   - The public API: (no)
   - The schema: (no)
   - The default values of configurations: (no)
   - The wire protocol: (no)
   - The rest endpoints: (no)
   - The admin cli options: (no)
   - Anything that affects deployment: (no)
   
   ### Documentation
   Check the box below and label this PR (if you have committer privilege).
   
   Need to update docs? 
   - [x] `doc` 
      Doc added in admin client api.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#issuecomment-1176050080

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#issuecomment-1175705523

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dragonls commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
dragonls commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r911739727


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // Form a score for a broker using its historical load and short-term load data with weight.
+    // Any broker at (or above) the overload threshold will have a score of POSITIVE_INFINITY.
+    private double getScore(final String broker, final BrokerData brokerData, final ServiceConfiguration conf) {
+        final double overloadThresholdPercentage = conf.getLoadBalancerBrokerOverloadedThresholdPercentage();
+        final double maxUsageWithWeightPercentage =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf) * 100;
+
+        if (maxUsageWithWeightPercentage > overloadThresholdPercentage) {
+            log.warn("Broker {} is overloaded: max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+            return Double.POSITIVE_INFINITY;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+        }
+        return maxUsageWithWeightPercentage;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        double minScore = Double.POSITIVE_INFINITY;
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+
+        for (String broker : candidates) {
+            final BrokerData brokerData = loadData.getBrokerData().get(broker);
+            final double score = getScore(broker, brokerData, conf);
+            if (score == Double.POSITIVE_INFINITY) {
+                final LocalBrokerData localData = brokerData.getLocalData();
+                log.warn(
+                        "Broker {} is overloaded: CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                                + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                                + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                        broker, localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                        localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                        localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                        conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                        conf.getLoadBalancerBandwithInResourceWeight(),
+                        conf.getLoadBalancerBandwithOutResourceWeight());
+            }
+            if (score < minScore) {
+                bestBrokers.clear();

Review Comment:
   Choosing a random broker could be much better while there are lots of bundle unloading, see the discussion: https://github.com/apache/pulsar/pull/14971#discussion_r897934631



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#issuecomment-1170802259

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r912145116


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // A broker's max resource usage with weight using its historical load and short-term load data with weight.
+    private double getMaxResourceUsageWithWeight(final String broker, final BrokerData brokerData,
+                                         final ServiceConfiguration conf) {
+        final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+        final double maxUsageWithWeight =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf);
+
+        if (maxUsageWithWeight > overloadThreshold) {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            log.warn(
+                    "Broker {} is overloaded, max resource usage with weight percentage: {}%, "
+                            + "CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                            + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                            + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                    broker, maxUsageWithWeight * 100,
+                    localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                    localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                    localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeight * 100);
+        }
+        return maxUsageWithWeight;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     * This method is not thread safety.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+        double totalUsage = 0.0d;
+        for (String broker : candidates) {
+            BrokerData brokerData = loadData.getBrokerData().get(broker);
+            double usageWithWeight = getMaxResourceUsageWithWeight(broker, brokerData, conf);
+            totalUsage += usageWithWeight;
+        }
+
+        final double avgUsage = totalUsage / candidates.size();
+        final double diffThreshold =
+                conf.getLoadBalancerAverageResourceUsageDifferenceThresholdShedderPercentage() / 100.0;
+        brokerAvgResourceUsageWithWeight.forEach((broker, avgResUsage) -> {
+            if (avgResUsage + diffThreshold <= avgUsage) {

Review Comment:
   I think this algo is better than the `minScore`, but we still have the `small randomization pool` issue here.
   example) b1:4, b2:20, b3:20
   
   Here, still, b1 will be repeatedly selected until b1's load is updated to the leader, though all of these brokers should be candidates as their loads are low. (As we discussed, when many bundles are unloading in a short period, this might lead to b1 ending up overloaded, like b1:90%, b2:20, b3:20) 
   
   Can we do a high_load_threshold_filter and random selection?
   
   I think one could implement a usage-and-selection-weighted random selection to weigh more on the underloaded and less selected brokers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#issuecomment-1178953649

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
HQebupt commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r912311755


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // A broker's max resource usage with weight using its historical load and short-term load data with weight.
+    private double getMaxResourceUsageWithWeight(final String broker, final BrokerData brokerData,
+                                         final ServiceConfiguration conf) {
+        final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+        final double maxUsageWithWeight =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf);
+
+        if (maxUsageWithWeight > overloadThreshold) {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            log.warn(
+                    "Broker {} is overloaded, max resource usage with weight percentage: {}%, "
+                            + "CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                            + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                            + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                    broker, maxUsageWithWeight * 100,
+                    localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                    localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                    localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeight * 100);
+        }
+        return maxUsageWithWeight;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     * This method is not thread safety.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+        double totalUsage = 0.0d;
+        for (String broker : candidates) {
+            BrokerData brokerData = loadData.getBrokerData().get(broker);
+            double usageWithWeight = getMaxResourceUsageWithWeight(broker, brokerData, conf);
+            totalUsage += usageWithWeight;
+        }
+
+        final double avgUsage = totalUsage / candidates.size();
+        final double diffThreshold =
+                conf.getLoadBalancerAverageResourceUsageDifferenceThresholdShedderPercentage() / 100.0;
+        brokerAvgResourceUsageWithWeight.forEach((broker, avgResUsage) -> {
+            if (avgResUsage + diffThreshold <= avgUsage) {

Review Comment:
   You are right. The original intention of this algorithm is to select a larger `randomization pool`. It doesn't work well for small clusters. But there is a compromise improvement point, that is, adjusting the `diffThreshold` to a negative number, which can expand the `randomization pool`.
    (eg: b1:4, b2:20, b3:20, set `diffThreshold`=-15, all of brokers can be candidates.) 
   This `diffThreshold` parameter takes effect dynamically, so it can be adjusted dynamically according to the status of cluster. Do you have a better suggestion? :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
HQebupt commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r911694424


##########
conf/broker.conf:
##########
@@ -1171,6 +1171,9 @@ defaultNamespaceBundleSplitAlgorithm=range_equally_divide
 # load shedding strategy, support OverloadShedder and ThresholdShedder, default is ThresholdShedder since 2.10.0
 loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder
 
+# load balance placement strategy, support LeastLongTermMessageRate and LeastResourceUsageWithWeight
+loadBalancerLoadPlacementStrategy=org.apache.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate

Review Comment:
   I don't think so. The new strategy requires more practice in the product environment before it becomes the default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r912527219


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // A broker's max resource usage with weight using its historical load and short-term load data with weight.
+    private double getMaxResourceUsageWithWeight(final String broker, final BrokerData brokerData,
+                                         final ServiceConfiguration conf) {
+        final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+        final double maxUsageWithWeight =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf);
+
+        if (maxUsageWithWeight > overloadThreshold) {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            log.warn(
+                    "Broker {} is overloaded, max resource usage with weight percentage: {}%, "
+                            + "CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                            + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                            + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                    broker, maxUsageWithWeight * 100,
+                    localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                    localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                    localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeight * 100);
+        }
+        return maxUsageWithWeight;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     * This method is not thread safety.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+        double totalUsage = 0.0d;
+        for (String broker : candidates) {
+            BrokerData brokerData = loadData.getBrokerData().get(broker);
+            double usageWithWeight = getMaxResourceUsageWithWeight(broker, brokerData, conf);
+            totalUsage += usageWithWeight;
+        }
+
+        final double avgUsage = totalUsage / candidates.size();
+        final double diffThreshold =
+                conf.getLoadBalancerAverageResourceUsageDifferenceThresholdShedderPercentage() / 100.0;
+        brokerAvgResourceUsageWithWeight.forEach((broker, avgResUsage) -> {
+            if (avgResUsage + diffThreshold <= avgUsage) {

Review Comment:
   It is good we have a fallback case that just randomly selects any broker, when empty.
   
   Please update the doc to further explain the edge cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
HQebupt commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r915748217


##########
conf/broker.conf:
##########
@@ -1171,12 +1171,22 @@ defaultNamespaceBundleSplitAlgorithm=range_equally_divide
 # load shedding strategy, support OverloadShedder and ThresholdShedder, default is ThresholdShedder since 2.10.0
 loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder
 
+# load balance placement strategy, support LeastLongTermMessageRate and LeastResourceUsageWithWeight
+loadBalancerLoadPlacementStrategy=org.apache.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate
+
 # The broker resource usage threshold.
 # When the broker resource usage is greater than the pulsar cluster average resource usage,
 # the threshold shedder will be triggered to offload bundles from the broker.
 # It only takes effect in the ThresholdShedder strategy.
 loadBalancerBrokerThresholdShedderPercentage=10
 
+# The broker average resource usage difference threshold.
+# Average resource usage difference threshold to determine a broker whether to be a best candidate in LeastResourceUsageWithWeight.
+# (eg: broker1 with 10% resource usage with weight and broker2 with 30% and broker3 with 80% will have 40% average resource usage.
+# The placement strategy can select broker1 and broker2 as best candidates.)
+# It only takes effect in the LeastResourceUsageWithWeight strategy.
+loadBalancerAverageResourceUsageDifferenceThresholdShedderPercentage=10

Review Comment:
   You are right. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r912145116


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // A broker's max resource usage with weight using its historical load and short-term load data with weight.
+    private double getMaxResourceUsageWithWeight(final String broker, final BrokerData brokerData,
+                                         final ServiceConfiguration conf) {
+        final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+        final double maxUsageWithWeight =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf);
+
+        if (maxUsageWithWeight > overloadThreshold) {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            log.warn(
+                    "Broker {} is overloaded, max resource usage with weight percentage: {}%, "
+                            + "CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                            + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                            + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                    broker, maxUsageWithWeight * 100,
+                    localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                    localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                    localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeight * 100);
+        }
+        return maxUsageWithWeight;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     * This method is not thread safety.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+        double totalUsage = 0.0d;
+        for (String broker : candidates) {
+            BrokerData brokerData = loadData.getBrokerData().get(broker);
+            double usageWithWeight = getMaxResourceUsageWithWeight(broker, brokerData, conf);
+            totalUsage += usageWithWeight;
+        }
+
+        final double avgUsage = totalUsage / candidates.size();
+        final double diffThreshold =
+                conf.getLoadBalancerAverageResourceUsageDifferenceThresholdShedderPercentage() / 100.0;
+        brokerAvgResourceUsageWithWeight.forEach((broker, avgResUsage) -> {
+            if (avgResUsage + diffThreshold <= avgUsage) {

Review Comment:
   I think this algo is better than the `minScore`, but we still have the `small randomization pool` issue here.
   example) b1:4, b2:20, b3:20
   
   Here, still, b1 will be repeatedly selected until b1's load is updated to the leader, though all of these brokers should be candidates as their loads are low. (As we discussed, when many bundles are unloading in a short period, this might lead to b1 ending up overloaded, like b1:90%, b2:20, b3:20) 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#issuecomment-1176951753

   @Demogorgon314 Could you please review the PR again ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r912395682


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // A broker's max resource usage with weight using its historical load and short-term load data with weight.
+    private double getMaxResourceUsageWithWeight(final String broker, final BrokerData brokerData,
+                                         final ServiceConfiguration conf) {
+        final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+        final double maxUsageWithWeight =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf);
+
+        if (maxUsageWithWeight > overloadThreshold) {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            log.warn(
+                    "Broker {} is overloaded, max resource usage with weight percentage: {}%, "
+                            + "CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                            + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                            + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                    broker, maxUsageWithWeight * 100,
+                    localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                    localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                    localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeight * 100);
+        }
+        return maxUsageWithWeight;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     * This method is not thread safety.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+        double totalUsage = 0.0d;
+        for (String broker : candidates) {
+            BrokerData brokerData = loadData.getBrokerData().get(broker);
+            double usageWithWeight = getMaxResourceUsageWithWeight(broker, brokerData, conf);
+            totalUsage += usageWithWeight;
+        }
+
+        final double avgUsage = totalUsage / candidates.size();
+        final double diffThreshold =
+                conf.getLoadBalancerAverageResourceUsageDifferenceThresholdShedderPercentage() / 100.0;
+        brokerAvgResourceUsageWithWeight.forEach((broker, avgResUsage) -> {
+            if (avgResUsage + diffThreshold <= avgUsage) {

Review Comment:
   Please explain the negative value in the doc, In the config definition.
   
   Another edge case is that with the negative threshold, it might be selecting overloaded brokers.
   example) threshold =-15, {b1=25, b2=100, b3=100, b4=100, b5=100} => b2, b3, b4, b5 will be selected.
   
   And another edge case is that with a low variance of the usage distribution, it might be selecting none of the brokers.
   example) threshold =10, {b1=24, b2=25, b3=26} => none will be selected.
   
   If we want to care more about the bigger randomization pool, one could just randomly select any brokers below LoadBalancerBrokerOverloadedThresholdPercentage, which could be just another strategy.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r912527219


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // A broker's max resource usage with weight using its historical load and short-term load data with weight.
+    private double getMaxResourceUsageWithWeight(final String broker, final BrokerData brokerData,
+                                         final ServiceConfiguration conf) {
+        final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+        final double maxUsageWithWeight =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf);
+
+        if (maxUsageWithWeight > overloadThreshold) {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            log.warn(
+                    "Broker {} is overloaded, max resource usage with weight percentage: {}%, "
+                            + "CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                            + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                            + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                    broker, maxUsageWithWeight * 100,
+                    localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                    localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                    localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeight * 100);
+        }
+        return maxUsageWithWeight;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     * This method is not thread safety.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+        double totalUsage = 0.0d;
+        for (String broker : candidates) {
+            BrokerData brokerData = loadData.getBrokerData().get(broker);
+            double usageWithWeight = getMaxResourceUsageWithWeight(broker, brokerData, conf);
+            totalUsage += usageWithWeight;
+        }
+
+        final double avgUsage = totalUsage / candidates.size();
+        final double diffThreshold =
+                conf.getLoadBalancerAverageResourceUsageDifferenceThresholdShedderPercentage() / 100.0;
+        brokerAvgResourceUsageWithWeight.forEach((broker, avgResUsage) -> {
+            if (avgResUsage + diffThreshold <= avgUsage) {

Review Comment:
   It is good we have a fallback case that just randomly selects any broker, when empty.
   
   But threshold=-10 {b1:1, b2:25, b3:25,b4=25 ... b10=25}, then 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r915386190


##########
conf/broker.conf:
##########
@@ -1171,12 +1171,22 @@ defaultNamespaceBundleSplitAlgorithm=range_equally_divide
 # load shedding strategy, support OverloadShedder and ThresholdShedder, default is ThresholdShedder since 2.10.0
 loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder
 
+# load balance placement strategy, support LeastLongTermMessageRate and LeastResourceUsageWithWeight
+loadBalancerLoadPlacementStrategy=org.apache.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate
+
 # The broker resource usage threshold.
 # When the broker resource usage is greater than the pulsar cluster average resource usage,
 # the threshold shedder will be triggered to offload bundles from the broker.
 # It only takes effect in the ThresholdShedder strategy.
 loadBalancerBrokerThresholdShedderPercentage=10
 
+# The broker average resource usage difference threshold.
+# Average resource usage difference threshold to determine a broker whether to be a best candidate in LeastResourceUsageWithWeight.
+# (eg: broker1 with 10% resource usage with weight and broker2 with 30% and broker3 with 80% will have 40% average resource usage.
+# The placement strategy can select broker1 and broker2 as best candidates.)
+# It only takes effect in the LeastResourceUsageWithWeight strategy.
+loadBalancerAverageResourceUsageDifferenceThresholdShedderPercentage=10

Review Comment:
   I guess the config should be `loadBalancerAverageResourceUsageDifferenceThresholdPercentage `?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#issuecomment-1178599180

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#issuecomment-1180553419

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#issuecomment-1176308586

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
HQebupt commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r912311777


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // A broker's max resource usage with weight using its historical load and short-term load data with weight.
+    private double getMaxResourceUsageWithWeight(final String broker, final BrokerData brokerData,
+                                         final ServiceConfiguration conf) {
+        final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+        final double maxUsageWithWeight =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf);
+
+        if (maxUsageWithWeight > overloadThreshold) {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            log.warn(
+                    "Broker {} is overloaded, max resource usage with weight percentage: {}%, "
+                            + "CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                            + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                            + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                    broker, maxUsageWithWeight * 100,
+                    localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                    localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                    localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeight * 100);
+        }
+        return maxUsageWithWeight;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     * This method is not thread safety.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+        double totalUsage = 0.0d;
+        for (String broker : candidates) {
+            BrokerData brokerData = loadData.getBrokerData().get(broker);
+            double usageWithWeight = getMaxResourceUsageWithWeight(broker, brokerData, conf);
+            totalUsage += usageWithWeight;
+        }
+
+        final double avgUsage = totalUsage / candidates.size();
+        final double diffThreshold =
+                conf.getLoadBalancerAverageResourceUsageDifferenceThresholdShedderPercentage() / 100.0;
+        brokerAvgResourceUsageWithWeight.forEach((broker, avgResUsage) -> {
+            if (avgResUsage + diffThreshold <= avgUsage) {

Review Comment:
   @heesung-sn @dragonls PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
HQebupt commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r912430031


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // A broker's max resource usage with weight using its historical load and short-term load data with weight.
+    private double getMaxResourceUsageWithWeight(final String broker, final BrokerData brokerData,
+                                         final ServiceConfiguration conf) {
+        final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+        final double maxUsageWithWeight =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf);
+
+        if (maxUsageWithWeight > overloadThreshold) {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            log.warn(
+                    "Broker {} is overloaded, max resource usage with weight percentage: {}%, "
+                            + "CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                            + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                            + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                    broker, maxUsageWithWeight * 100,
+                    localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                    localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                    localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeight * 100);
+        }
+        return maxUsageWithWeight;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     * This method is not thread safety.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+        double totalUsage = 0.0d;
+        for (String broker : candidates) {
+            BrokerData brokerData = loadData.getBrokerData().get(broker);
+            double usageWithWeight = getMaxResourceUsageWithWeight(broker, brokerData, conf);
+            totalUsage += usageWithWeight;
+        }
+
+        final double avgUsage = totalUsage / candidates.size();
+        final double diffThreshold =
+                conf.getLoadBalancerAverageResourceUsageDifferenceThresholdShedderPercentage() / 100.0;
+        brokerAvgResourceUsageWithWeight.forEach((broker, avgResUsage) -> {
+            if (avgResUsage + diffThreshold <= avgUsage) {

Review Comment:
   Truely. An algorithm that can be applied to the many cases is good, but cannot cover all cases.
   > Another edge case is that with the negative threshold, it might be selecting overloaded brokers.
   example) threshold =-15, {b1=25, b2=100, b3=100, b4=100, b5=100} => b2, b3, b4, b5 will be selected.
   
   The operator need to know the status of the cluster and configure the threshold appropriately.
   - negative threshold applies when a large number of brokers have less load. 
   - positive threshold applies when a large number of brokers have heavy load.
   
   > And another edge case is that with a low variance of the usage distribution, it might be selecting none of the brokers.
   example) threshold =10, {b1=24, b2=25, b3=26} => none will be selected.
   
   Assign randomly if none is selected.
   <img width="854" alt="image" src="https://user-images.githubusercontent.com/4970972/177025193-55f818ca-d5a4-4e2c-b192-39590dbbfe5d.png">
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dragonls commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
dragonls commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r912590726


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // A broker's max resource usage with weight using its historical load and short-term load data with weight.
+    private double getMaxResourceUsageWithWeight(final String broker, final BrokerData brokerData,
+                                         final ServiceConfiguration conf) {
+        final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+        final double maxUsageWithWeight =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf);
+
+        if (maxUsageWithWeight > overloadThreshold) {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            log.warn(
+                    "Broker {} is overloaded, max resource usage with weight percentage: {}%, "
+                            + "CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                            + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                            + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                    broker, maxUsageWithWeight * 100,
+                    localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                    localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                    localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeight * 100);
+        }
+        return maxUsageWithWeight;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     * This method is not thread safety.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+        double totalUsage = 0.0d;
+        for (String broker : candidates) {
+            BrokerData brokerData = loadData.getBrokerData().get(broker);
+            double usageWithWeight = getMaxResourceUsageWithWeight(broker, brokerData, conf);
+            totalUsage += usageWithWeight;
+        }
+
+        final double avgUsage = totalUsage / candidates.size();
+        final double diffThreshold =
+                conf.getLoadBalancerAverageResourceUsageDifferenceThresholdShedderPercentage() / 100.0;
+        brokerAvgResourceUsageWithWeight.forEach((broker, avgResUsage) -> {
+            if (avgResUsage + diffThreshold <= avgUsage) {

Review Comment:
   I prefer just keep the algorithm simple, otherwise it is hard to debug and understand.
   
   The `diffThreshold` might filter all candidates out and finally just randomly select the broker from all candidates, which might be high load.
   
   The principle of selection is to let the broker with lower load be selected, such as selecting a broker which is under avg load.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r912145116


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // A broker's max resource usage with weight using its historical load and short-term load data with weight.
+    private double getMaxResourceUsageWithWeight(final String broker, final BrokerData brokerData,
+                                         final ServiceConfiguration conf) {
+        final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+        final double maxUsageWithWeight =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf);
+
+        if (maxUsageWithWeight > overloadThreshold) {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            log.warn(
+                    "Broker {} is overloaded, max resource usage with weight percentage: {}%, "
+                            + "CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                            + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                            + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                    broker, maxUsageWithWeight * 100,
+                    localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                    localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                    localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeight * 100);
+        }
+        return maxUsageWithWeight;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     * This method is not thread safety.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+        double totalUsage = 0.0d;
+        for (String broker : candidates) {
+            BrokerData brokerData = loadData.getBrokerData().get(broker);
+            double usageWithWeight = getMaxResourceUsageWithWeight(broker, brokerData, conf);
+            totalUsage += usageWithWeight;
+        }
+
+        final double avgUsage = totalUsage / candidates.size();
+        final double diffThreshold =
+                conf.getLoadBalancerAverageResourceUsageDifferenceThresholdShedderPercentage() / 100.0;
+        brokerAvgResourceUsageWithWeight.forEach((broker, avgResUsage) -> {
+            if (avgResUsage + diffThreshold <= avgUsage) {

Review Comment:
   I think this algo is better than the `minScore`, but we still have the `small randomization pool` issue here.
   example) b1:4, b2:20, b3:20
   
   Here, still, b1 will be repeatedly selected until b1's load is updated to the leader, though all of these brokers should be candidates as their loads are low. (As we discussed, when many bundles are unloading in a short period, this might lead to b1 ending up overloaded, like b1:90%, b2:20, b3:20) 
   
   Can we do a high_load_threshold_filter and random selection?
   
   I think one could implement a usage-and-selection-weighted random selection to weigh more on the underloaded and less selected(in one update cycle) brokers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #16281:
URL: https://github.com/apache/pulsar/pull/16281


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
HQebupt commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r911685426


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // Form a score for a broker using its historical load and short-term load data with weight.
+    // Any broker at (or above) the overload threshold will have a score of POSITIVE_INFINITY.
+    private double getScore(final String broker, final BrokerData brokerData, final ServiceConfiguration conf) {
+        final double overloadThresholdPercentage = conf.getLoadBalancerBrokerOverloadedThresholdPercentage();
+        final double maxUsageWithWeightPercentage =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf) * 100;
+
+        if (maxUsageWithWeightPercentage > overloadThresholdPercentage) {
+            log.warn("Broker {} is overloaded: max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+            return Double.POSITIVE_INFINITY;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+        }
+        return maxUsageWithWeightPercentage;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        double minScore = Double.POSITIVE_INFINITY;
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+
+        for (String broker : candidates) {
+            final BrokerData brokerData = loadData.getBrokerData().get(broker);
+            final double score = getScore(broker, brokerData, conf);
+            if (score == Double.POSITIVE_INFINITY) {
+                final LocalBrokerData localData = brokerData.getLocalData();
+                log.warn(
+                        "Broker {} is overloaded: CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                                + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                                + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                        broker, localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                        localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                        localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                        conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                        conf.getLoadBalancerBandwithInResourceWeight(),
+                        conf.getLoadBalancerBandwithOutResourceWeight());
+            }
+            if (score < minScore) {
+                bestBrokers.clear();

Review Comment:
   > The least loaded broker will be repeatedly selected until the broker load is updated to the leader.
   There is indeed such a possibility. But it looks as expected. The broker upload load report to the leader if one of conditions as follows:
   - Load exceeds the threshold by 10%
   - Time interval of 15 minutes
   Therefore, I believe it's near real time.
   
   > I am wondering if filling bestBrokers only with minScore brokers is too much limiting the randomization pool.
   Choosing a random or minScore broker requires a lot of testing. As far as our cluster is concerned, using the minScore broker algorithm works just fine. The bundle unloading occurs less time than `LeastLongTermMessageRate`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
HQebupt commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r911685426


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // Form a score for a broker using its historical load and short-term load data with weight.
+    // Any broker at (or above) the overload threshold will have a score of POSITIVE_INFINITY.
+    private double getScore(final String broker, final BrokerData brokerData, final ServiceConfiguration conf) {
+        final double overloadThresholdPercentage = conf.getLoadBalancerBrokerOverloadedThresholdPercentage();
+        final double maxUsageWithWeightPercentage =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf) * 100;
+
+        if (maxUsageWithWeightPercentage > overloadThresholdPercentage) {
+            log.warn("Broker {} is overloaded: max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+            return Double.POSITIVE_INFINITY;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+        }
+        return maxUsageWithWeightPercentage;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        double minScore = Double.POSITIVE_INFINITY;
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+
+        for (String broker : candidates) {
+            final BrokerData brokerData = loadData.getBrokerData().get(broker);
+            final double score = getScore(broker, brokerData, conf);
+            if (score == Double.POSITIVE_INFINITY) {
+                final LocalBrokerData localData = brokerData.getLocalData();
+                log.warn(
+                        "Broker {} is overloaded: CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                                + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                                + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                        broker, localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                        localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                        localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                        conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                        conf.getLoadBalancerBandwithInResourceWeight(),
+                        conf.getLoadBalancerBandwithOutResourceWeight());
+            }
+            if (score < minScore) {
+                bestBrokers.clear();

Review Comment:
   > The least loaded broker will be repeatedly selected until the broker load is updated to the leader.
   
   There is indeed such a possibility. But it looks as expected. The broker upload load report to the leader if one of conditions as follows:
   - Load exceeds the threshold by 10%
   - Time interval of 15 minutes
   Therefore, I believe it's near real time.
   
   > I am wondering if filling bestBrokers only with minScore brokers is too much limiting the randomization pool.
   
   Choosing a random or minScore broker requires a lot of testing. As far as our cluster is concerned, using the minScore broker algorithm works just fine. The bundle unloading occurs less time than `LeastLongTermMessageRate`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
HQebupt commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r911706571


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java:
##########
@@ -59,6 +60,42 @@ public void testLeastLongTermMessageRate() {
         assertEquals(strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf), Optional.of("3"));
     }
 
+    // Test that least resource usage with weight works correctly.
+    public void testLeastResourceUsageWithWeight() {
+        BundleData bundleData = new BundleData();
+        BrokerData brokerData1 = initBrokerData(10, 100);
+        BrokerData brokerData2 = initBrokerData(20, 100);
+        BrokerData brokerData4 = initBrokerData(40, 100);
+        LoadData loadData = new LoadData();
+        Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
+        brokerDataMap.put("1", brokerData1);
+        brokerDataMap.put("2", brokerData2);
+        brokerDataMap.put("4", brokerData4);
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setLoadBalancerBrokerThresholdShedderPercentage(10);
+        conf.setLoadBalancerCPUResourceWeight(1.0);
+        conf.setLoadBalancerMemoryResourceWeight(0.1);
+        conf.setLoadBalancerDirectMemoryResourceWeight(0.1);
+        conf.setLoadBalancerBandwithInResourceWeight(1.0);
+        conf.setLoadBalancerBandwithOutResourceWeight(1.0);
+
+        ModularLoadManagerStrategy strategy = new LeastResourceUsageWithWeight();
+        assertEquals(strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf), Optional.of("1"));
+    }
+

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
HQebupt commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r911860472


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // Form a score for a broker using its historical load and short-term load data with weight.
+    // Any broker at (or above) the overload threshold will have a score of POSITIVE_INFINITY.
+    private double getScore(final String broker, final BrokerData brokerData, final ServiceConfiguration conf) {
+        final double overloadThresholdPercentage = conf.getLoadBalancerBrokerOverloadedThresholdPercentage();
+        final double maxUsageWithWeightPercentage =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf) * 100;
+
+        if (maxUsageWithWeightPercentage > overloadThresholdPercentage) {
+            log.warn("Broker {} is overloaded: max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+            return Double.POSITIVE_INFINITY;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+        }
+        return maxUsageWithWeightPercentage;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        double minScore = Double.POSITIVE_INFINITY;
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+
+        for (String broker : candidates) {
+            final BrokerData brokerData = loadData.getBrokerData().get(broker);
+            final double score = getScore(broker, brokerData, conf);
+            if (score == Double.POSITIVE_INFINITY) {
+                final LocalBrokerData localData = brokerData.getLocalData();
+                log.warn(
+                        "Broker {} is overloaded: CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                                + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                                + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                        broker, localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                        localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                        localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                        conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                        conf.getLoadBalancerBandwithInResourceWeight(),
+                        conf.getLoadBalancerBandwithOutResourceWeight());
+            }
+            if (score < minScore) {
+                bestBrokers.clear();

Review Comment:
   It is a good point. I update the algorithm with the average resource usage difference threshold to determine a broker whether to be a best candidate. PLAL. @dragonls @heesung-sn 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r913694552


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java:
##########
@@ -59,6 +60,76 @@ public void testLeastLongTermMessageRate() {
         assertEquals(strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf), Optional.of("3"));
     }
 
+    // Test that least resource usage with weight works correctly.
+    public void testLeastResourceUsageWithWeight() {
+        BundleData bundleData = new BundleData();
+        BrokerData brokerData1 = initBrokerData(10, 100);
+        BrokerData brokerData2 = initBrokerData(30, 100);
+        BrokerData brokerData3 = initBrokerData(60, 100);
+        LoadData loadData = new LoadData();
+        Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
+        brokerDataMap.put("1", brokerData1);
+        brokerDataMap.put("2", brokerData2);
+        brokerDataMap.put("3", brokerData3);
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setLoadBalancerBrokerThresholdShedderPercentage(5);

Review Comment:
   Why do we set this config?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);

Review Comment:
   We can use `@Slf4j` annotation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#issuecomment-1178456734

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r911306976


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // Form a score for a broker using its historical load and short-term load data with weight.
+    // Any broker at (or above) the overload threshold will have a score of POSITIVE_INFINITY.
+    private double getScore(final String broker, final BrokerData brokerData, final ServiceConfiguration conf) {
+        final double overloadThresholdPercentage = conf.getLoadBalancerBrokerOverloadedThresholdPercentage();
+        final double maxUsageWithWeightPercentage =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf) * 100;
+
+        if (maxUsageWithWeightPercentage > overloadThresholdPercentage) {
+            log.warn("Broker {} is overloaded: max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+            return Double.POSITIVE_INFINITY;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+        }
+        return maxUsageWithWeightPercentage;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        double minScore = Double.POSITIVE_INFINITY;
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+
+        for (String broker : candidates) {
+            final BrokerData brokerData = loadData.getBrokerData().get(broker);
+            final double score = getScore(broker, brokerData, conf);
+            if (score == Double.POSITIVE_INFINITY) {
+                final LocalBrokerData localData = brokerData.getLocalData();
+                log.warn(
+                        "Broker {} is overloaded: CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                                + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                                + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                        broker, localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                        localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                        localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                        conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                        conf.getLoadBalancerBandwithInResourceWeight(),
+                        conf.getLoadBalancerBandwithOutResourceWeight());
+            }
+            if (score < minScore) {
+                bestBrokers.clear();

Review Comment:
   Here, one of the edge cases I could think of is that the least loaded broker will be repeatedly selected until the broker load is updated to the leader. 
   
   I am wondering if filling `bestBrokers` only with `minScore` brokers is too much limiting the randomization pool.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // Form a score for a broker using its historical load and short-term load data with weight.
+    // Any broker at (or above) the overload threshold will have a score of POSITIVE_INFINITY.
+    private double getScore(final String broker, final BrokerData brokerData, final ServiceConfiguration conf) {
+        final double overloadThresholdPercentage = conf.getLoadBalancerBrokerOverloadedThresholdPercentage();
+        final double maxUsageWithWeightPercentage =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf) * 100;
+
+        if (maxUsageWithWeightPercentage > overloadThresholdPercentage) {
+            log.warn("Broker {} is overloaded: max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+            return Double.POSITIVE_INFINITY;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+        }
+        return maxUsageWithWeightPercentage;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        double minScore = Double.POSITIVE_INFINITY;
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+
+        for (String broker : candidates) {
+            final BrokerData brokerData = loadData.getBrokerData().get(broker);
+            final double score = getScore(broker, brokerData, conf);
+            if (score == Double.POSITIVE_INFINITY) {
+                final LocalBrokerData localData = brokerData.getLocalData();
+                log.warn(
+                        "Broker {} is overloaded: CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                                + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                                + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                        broker, localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                        localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                        localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                        conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                        conf.getLoadBalancerBandwithInResourceWeight(),
+                        conf.getLoadBalancerBandwithOutResourceWeight());
+            }
+            if (score < minScore) {
+                bestBrokers.clear();
+                bestBrokers.add(broker);
+                minScore = score;
+            } else if (score == minScore) {
+                bestBrokers.add(broker);
+            }
+        }
+        if (bestBrokers.isEmpty()) {
+            // Assign randomly if all brokers are overloaded.
+            bestBrokers.addAll(candidates);
+        }
+
+        if (bestBrokers.isEmpty()) {
+            // If still, it means there are no available brokers at this point.

Review Comment:
   Please add a warn log here. (e.g. `candidates` is empty)



##########
conf/broker.conf:
##########
@@ -1171,6 +1171,9 @@ defaultNamespaceBundleSplitAlgorithm=range_equally_divide
 # load shedding strategy, support OverloadShedder and ThresholdShedder, default is ThresholdShedder since 2.10.0
 loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder
 
+# load balance placement strategy, support LeastLongTermMessageRate and LeastResourceUsageWithWeight
+loadBalancerLoadPlacementStrategy=org.apache.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate

Review Comment:
   To me, `LeastResourceUsageWithWeight` looks better than `LeastLongTermMessageRate`. What is our plan to make `LeastResourceUsageWithWeight` as the default?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // Form a score for a broker using its historical load and short-term load data with weight.
+    // Any broker at (or above) the overload threshold will have a score of POSITIVE_INFINITY.
+    private double getScore(final String broker, final BrokerData brokerData, final ServiceConfiguration conf) {
+        final double overloadThresholdPercentage = conf.getLoadBalancerBrokerOverloadedThresholdPercentage();
+        final double maxUsageWithWeightPercentage =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf) * 100;
+
+        if (maxUsageWithWeightPercentage > overloadThresholdPercentage) {
+            log.warn("Broker {} is overloaded: max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+            return Double.POSITIVE_INFINITY;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+        }
+        return maxUsageWithWeightPercentage;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        double minScore = Double.POSITIVE_INFINITY;
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+
+        for (String broker : candidates) {
+            final BrokerData brokerData = loadData.getBrokerData().get(broker);
+            final double score = getScore(broker, brokerData, conf);
+            if (score == Double.POSITIVE_INFINITY) {
+                final LocalBrokerData localData = brokerData.getLocalData();
+                log.warn(
+                        "Broker {} is overloaded: CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                                + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                                + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                        broker, localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                        localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                        localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                        conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                        conf.getLoadBalancerBandwithInResourceWeight(),
+                        conf.getLoadBalancerBandwithOutResourceWeight());
+            }
+            if (score < minScore) {
+                bestBrokers.clear();
+                bestBrokers.add(broker);
+                minScore = score;
+            } else if (score == minScore) {
+                bestBrokers.add(broker);
+            }
+        }
+        if (bestBrokers.isEmpty()) {
+            // Assign randomly if all brokers are overloaded.

Review Comment:
   Please add a warn log here.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {

Review Comment:
   I think this logic is similar to(conflicting with) https://github.com/apache/pulsar/pull/14971, which is in PR.
   
   Could you communicate with the author, @[dragonls](https://github.com/dragonls) and decide which one to pursue?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // Form a score for a broker using its historical load and short-term load data with weight.
+    // Any broker at (or above) the overload threshold will have a score of POSITIVE_INFINITY.
+    private double getScore(final String broker, final BrokerData brokerData, final ServiceConfiguration conf) {
+        final double overloadThresholdPercentage = conf.getLoadBalancerBrokerOverloadedThresholdPercentage();
+        final double maxUsageWithWeightPercentage =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf) * 100;
+
+        if (maxUsageWithWeightPercentage > overloadThresholdPercentage) {
+            log.warn("Broker {} is overloaded: max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+            return Double.POSITIVE_INFINITY;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+        }
+        return maxUsageWithWeightPercentage;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();

Review Comment:
   nit: please mention that this is not thread-safe.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // Form a score for a broker using its historical load and short-term load data with weight.
+    // Any broker at (or above) the overload threshold will have a score of POSITIVE_INFINITY.
+    private double getScore(final String broker, final BrokerData brokerData, final ServiceConfiguration conf) {
+        final double overloadThresholdPercentage = conf.getLoadBalancerBrokerOverloadedThresholdPercentage();
+        final double maxUsageWithWeightPercentage =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf) * 100;
+
+        if (maxUsageWithWeightPercentage > overloadThresholdPercentage) {
+            log.warn("Broker {} is overloaded: max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+            return Double.POSITIVE_INFINITY;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+        }
+        return maxUsageWithWeightPercentage;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        double minScore = Double.POSITIVE_INFINITY;
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+
+        for (String broker : candidates) {
+            final BrokerData brokerData = loadData.getBrokerData().get(broker);
+            final double score = getScore(broker, brokerData, conf);
+            if (score == Double.POSITIVE_INFINITY) {
+                final LocalBrokerData localData = brokerData.getLocalData();
+                log.warn(
+                        "Broker {} is overloaded: CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                                + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                                + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                        broker, localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                        localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                        localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                        conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                        conf.getLoadBalancerBandwithInResourceWeight(),
+                        conf.getLoadBalancerBandwithOutResourceWeight());
+            }
+            if (score < minScore) {
+                bestBrokers.clear();
+                bestBrokers.add(broker);
+                minScore = score;
+            } else if (score == minScore) {
+                bestBrokers.add(broker);
+            }
+        }
+        if (bestBrokers.isEmpty()) {
+            // Assign randomly if all brokers are overloaded.
+            bestBrokers.addAll(candidates);
+        }
+
+        if (bestBrokers.isEmpty()) {
+            // If still, it means there are no available brokers at this point.
+            return Optional.empty();
+        }
+        return Optional.of(bestBrokers.get(ThreadLocalRandom.current().nextInt(bestBrokers.size())));

Review Comment:
   Please add a debug log under `if (log.isDebugEnabled()) {` to log bestBrokers details(e.g. size) before the return.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r911298461


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // Form a score for a broker using its historical load and short-term load data with weight.
+    // Any broker at (or above) the overload threshold will have a score of POSITIVE_INFINITY.
+    private double getScore(final String broker, final BrokerData brokerData, final ServiceConfiguration conf) {
+        final double overloadThresholdPercentage = conf.getLoadBalancerBrokerOverloadedThresholdPercentage();
+        final double maxUsageWithWeightPercentage =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf) * 100;
+
+        if (maxUsageWithWeightPercentage > overloadThresholdPercentage) {
+            log.warn("Broker {} is overloaded: max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+            return Double.POSITIVE_INFINITY;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeightPercentage);
+        }
+        return maxUsageWithWeightPercentage;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        double minScore = Double.POSITIVE_INFINITY;
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+
+        for (String broker : candidates) {
+            final BrokerData brokerData = loadData.getBrokerData().get(broker);
+            final double score = getScore(broker, brokerData, conf);
+            if (score == Double.POSITIVE_INFINITY) {
+                final LocalBrokerData localData = brokerData.getLocalData();
+                log.warn(
+                        "Broker {} is overloaded: CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                                + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                                + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                        broker, localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                        localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                        localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                        conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                        conf.getLoadBalancerBandwithInResourceWeight(),
+                        conf.getLoadBalancerBandwithOutResourceWeight());
+            }
+            if (score < minScore) {
+                bestBrokers.clear();
+                bestBrokers.add(broker);
+                minScore = score;
+            } else if (score == minScore) {
+                bestBrokers.add(broker);
+            }
+        }
+        if (bestBrokers.isEmpty()) {
+            // Assign randomly if all brokers are overloaded.
+            bestBrokers.addAll(candidates);
+        }
+
+        if (bestBrokers.isEmpty()) {
+            // If still, it means there are no available brokers at this point.

Review Comment:
   Please add an error log here. (e.g. `candidates` is empty)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r913533030


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // A broker's max resource usage with weight using its historical load and short-term load data with weight.
+    private double getMaxResourceUsageWithWeight(final String broker, final BrokerData brokerData,
+                                         final ServiceConfiguration conf) {
+        final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+        final double maxUsageWithWeight =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf);
+
+        if (maxUsageWithWeight > overloadThreshold) {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            log.warn(
+                    "Broker {} is overloaded, max resource usage with weight percentage: {}%, "
+                            + "CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                            + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                            + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                    broker, maxUsageWithWeight * 100,
+                    localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                    localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                    localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeight * 100);
+        }
+        return maxUsageWithWeight;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     * This method is not thread safety.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+        double totalUsage = 0.0d;
+        for (String broker : candidates) {
+            BrokerData brokerData = loadData.getBrokerData().get(broker);
+            double usageWithWeight = getMaxResourceUsageWithWeight(broker, brokerData, conf);
+            totalUsage += usageWithWeight;
+        }
+
+        final double avgUsage = totalUsage / candidates.size();
+        final double diffThreshold =
+                conf.getLoadBalancerAverageResourceUsageDifferenceThresholdShedderPercentage() / 100.0;
+        brokerAvgResourceUsageWithWeight.forEach((broker, avgResUsage) -> {
+            if (avgResUsage + diffThreshold <= avgUsage) {
+                bestBrokers.add(broker);
+            }
+        });
+
+        if (bestBrokers.isEmpty()) {
+            // Assign randomly if all brokers are overloaded.
+            log.warn("Assign randomly if all {} brokers are overloaded.", candidates.size());

Review Comment:
   in English is better to say "Assign randomly as all {} brokers are overloaded"



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategy.java:
##########
@@ -49,12 +49,18 @@ Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign,
     /**
      * Create a placement strategy using the configuration.
      *
+     * @param conf ServiceConfiguration to use.
      * @return A placement strategy from the given configurations.
      */
-    static ModularLoadManagerStrategy create() {
+    static ModularLoadManagerStrategy create(final ServiceConfiguration conf) {
         try {
-            // Only one strategy at the moment.
-            return new LeastLongTermMessageRate();
+            Class<?> loadAssignmentClass = Class.forName(conf.getLoadBalancerLoadPlacementStrategy());
+            Object loadAssignmentInstance = loadAssignmentClass.getDeclaredConstructor().newInstance();

Review Comment:
   please take a look to other parts in Pulsar code, where we load classes with reflection, we should have some common code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#issuecomment-1174957075

   > I am not a expert of this part. So I won't say much about the algorithm.
   > 
   > A part from that I believe that the change is good. we are adding a new pluggable part of Pulsar, so a little discussion on [dev@pulsar.apache.org](mailto:dev@pulsar.apache.org) is deserved
   > 
   > thanks for contributing this enhancement
   
   Thx, here is the email discussion thread: https://lists.apache.org/thread/36vyyhndr4og175k2bz3mfdf5ctd2xky.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r916068778


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // A broker's max resource usage with weight using its historical load and short-term load data with weight.
+    private double getMaxResourceUsageWithWeight(final String broker, final BrokerData brokerData,
+                                         final ServiceConfiguration conf) {
+        final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+        final double maxUsageWithWeight =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf);
+
+        if (maxUsageWithWeight > overloadThreshold) {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            log.warn(
+                    "Broker {} is overloaded, max resource usage with weight percentage: {}%, "
+                            + "CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                            + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                            + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                    broker, maxUsageWithWeight * 100,
+                    localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                    localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                    localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeight * 100);
+        }
+        return maxUsageWithWeight;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     * This method is not thread safety.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+        double totalUsage = 0.0d;
+        for (String broker : candidates) {
+            BrokerData brokerData = loadData.getBrokerData().get(broker);
+            double usageWithWeight = getMaxResourceUsageWithWeight(broker, brokerData, conf);
+            totalUsage += usageWithWeight;
+        }
+
+        final double avgUsage = totalUsage / candidates.size();
+        final double diffThreshold =
+                conf.getLoadBalancerAverageResourceUsageDifferenceThresholdShedderPercentage() / 100.0;
+        brokerAvgResourceUsageWithWeight.forEach((broker, avgResUsage) -> {
+            if (avgResUsage + diffThreshold <= avgUsage) {

Review Comment:
   It would be great if we could explain more about the negative and positive threshold considerations in the config `broker.conf`.
   
   Otherwise, LGTM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r914732087


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategy.java:
##########
@@ -49,12 +50,13 @@ Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign,
     /**
      * Create a placement strategy using the configuration.
      *
+     * @param conf ServiceConfiguration to use.
      * @return A placement strategy from the given configurations.
      */
-    static ModularLoadManagerStrategy create() {
+    static ModularLoadManagerStrategy create(final ServiceConfiguration conf) {
         try {
-            // Only one strategy at the moment.
-            return new LeastLongTermMessageRate();
+            return Reflections.createInstance(conf.getLoadBalancerLoadPlacementStrategy(),
+                    ModularLoadManagerStrategy.class, Thread.currentThread().getContextClassLoader());

Review Comment:
   Please do not ignore the error.
   Otherwise the admin may have configured a wrong class name and the broker will run in a way that is not expected.
   
   We must fail the broker startup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
HQebupt commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r911685742


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {

Review Comment:
   Sure. I can contact him.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r912145116


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // A broker's max resource usage with weight using its historical load and short-term load data with weight.
+    private double getMaxResourceUsageWithWeight(final String broker, final BrokerData brokerData,
+                                         final ServiceConfiguration conf) {
+        final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+        final double maxUsageWithWeight =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf);
+
+        if (maxUsageWithWeight > overloadThreshold) {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            log.warn(
+                    "Broker {} is overloaded, max resource usage with weight percentage: {}%, "
+                            + "CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                            + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                            + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                    broker, maxUsageWithWeight * 100,
+                    localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                    localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                    localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeight * 100);
+        }
+        return maxUsageWithWeight;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     * This method is not thread safety.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+        double totalUsage = 0.0d;
+        for (String broker : candidates) {
+            BrokerData brokerData = loadData.getBrokerData().get(broker);
+            double usageWithWeight = getMaxResourceUsageWithWeight(broker, brokerData, conf);
+            totalUsage += usageWithWeight;
+        }
+
+        final double avgUsage = totalUsage / candidates.size();
+        final double diffThreshold =
+                conf.getLoadBalancerAverageResourceUsageDifferenceThresholdShedderPercentage() / 100.0;
+        brokerAvgResourceUsageWithWeight.forEach((broker, avgResUsage) -> {
+            if (avgResUsage + diffThreshold <= avgUsage) {

Review Comment:
   I think this algo is better than the `minScore`, but we still have the `small randomization pool` issue here.
   example) b1:4, b2:20, b3:20
   
   Here, still, b1 will be repeatedly selected until b1's load is updated to the leader, though all of these brokers should be candidates as their loads are low. (As we discussed, when many bundles are unloading in a short period, this might lead to b1 ending up overloaded, like b1:90%, b2:20, b3:20) 
   
   Can we do a high_load_threshold_filter and random selection?
   
   I think one could implement a usage-and-selection-weighted random selection to weigh more on the underloaded brokers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org