You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/01 17:39:09 UTC

[GitHub] [pulsar] heesung-sn commented on a diff in pull request #16281: [feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight

heesung-sn commented on code in PR #16281:
URL: https://github.com/apache/pulsar/pull/16281#discussion_r912145116


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Placement strategy which selects a broker based on which one has the least resource usage with weight.
+ * This strategy takes into account the historical load percentage and short-term load percentage, and thus will not
+ * cause cluster fluctuations due to short-term load jitter.
+ */
+public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy {
+    private static Logger log = LoggerFactory.getLogger(LeastResourceUsageWithWeight.class);
+
+    // Maintain this list to reduce object creation.
+    private final ArrayList<String> bestBrokers;
+    private final Map<String, Double> brokerAvgResourceUsageWithWeight;
+
+    public LeastResourceUsageWithWeight() {
+        this.bestBrokers = new ArrayList<>();
+        this.brokerAvgResourceUsageWithWeight = new HashMap<>();
+    }
+
+    // A broker's max resource usage with weight using its historical load and short-term load data with weight.
+    private double getMaxResourceUsageWithWeight(final String broker, final BrokerData brokerData,
+                                         final ServiceConfiguration conf) {
+        final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
+        final double maxUsageWithWeight =
+                updateAndGetMaxResourceUsageWithWeight(broker, brokerData, conf);
+
+        if (maxUsageWithWeight > overloadThreshold) {
+            final LocalBrokerData localData = brokerData.getLocalData();
+            log.warn(
+                    "Broker {} is overloaded, max resource usage with weight percentage: {}%, "
+                            + "CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+                            + "BANDWIDTH OUT: {}%, CPU weight: {}, MEMORY weight: {}, DIRECT MEMORY weight: {}, "
+                            + "BANDWIDTH IN weight: {}, BANDWIDTH OUT weight: {}",
+                    broker, maxUsageWithWeight * 100,
+                    localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
+                    localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
+                    localData.getBandwidthOut().percentUsage(), conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Broker {} has max resource usage with weight percentage: {}%",
+                    brokerData.getLocalData().getWebServiceUrl(), maxUsageWithWeight * 100);
+        }
+        return maxUsageWithWeight;
+    }
+
+    /**
+     * Update and get the max resource usage with weight of broker according to the service configuration.
+     *
+     * @param broker     the broker name.
+     * @param brokerData The broker load data.
+     * @param conf       The service configuration.
+     * @return the max resource usage with weight of broker
+     */
+    private double updateAndGetMaxResourceUsageWithWeight(String broker, BrokerData brokerData,
+                                                          ServiceConfiguration conf) {
+        final double historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
+        Double historyUsage = brokerAvgResourceUsageWithWeight.get(broker);
+        double resourceUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
+                conf.getLoadBalancerCPUResourceWeight(),
+                conf.getLoadBalancerMemoryResourceWeight(),
+                conf.getLoadBalancerDirectMemoryResourceWeight(),
+                conf.getLoadBalancerBandwithInResourceWeight(),
+                conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Broker {} get max resource usage with weight: {}, history resource percentage: {}%, CPU weight: "
+                            + "{}, MEMORY weight: {}, DIRECT MEMORY weight: {}, BANDWIDTH IN weight: {}, BANDWIDTH "
+                            + "OUT weight: {} ",
+                    broker, historyUsage, historyPercentage, conf.getLoadBalancerCPUResourceWeight(),
+                    conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
+                    conf.getLoadBalancerBandwithInResourceWeight(),
+                    conf.getLoadBalancerBandwithOutResourceWeight());
+        }
+        brokerAvgResourceUsageWithWeight.put(broker, historyUsage);
+        return historyUsage;
+    }
+
+    /**
+     * Find a suitable broker to assign the given bundle to.
+     * This method is not thread safety.
+     *
+     * @param candidates     The candidates for which the bundle may be assigned.
+     * @param bundleToAssign The data for the bundle to assign.
+     * @param loadData       The load data from the leader broker.
+     * @param conf           The service configuration.
+     * @return The name of the selected broker as it appears on ZooKeeper.
+     */
+    @Override
+    public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
+                                         ServiceConfiguration conf) {
+        bestBrokers.clear();
+        // Maintain of list of all the best scoring brokers and then randomly
+        // select one of them at the end.
+        double totalUsage = 0.0d;
+        for (String broker : candidates) {
+            BrokerData brokerData = loadData.getBrokerData().get(broker);
+            double usageWithWeight = getMaxResourceUsageWithWeight(broker, brokerData, conf);
+            totalUsage += usageWithWeight;
+        }
+
+        final double avgUsage = totalUsage / candidates.size();
+        final double diffThreshold =
+                conf.getLoadBalancerAverageResourceUsageDifferenceThresholdShedderPercentage() / 100.0;
+        brokerAvgResourceUsageWithWeight.forEach((broker, avgResUsage) -> {
+            if (avgResUsage + diffThreshold <= avgUsage) {

Review Comment:
   I think this algo is better than the `minScore`, but we still have the `small randomization pool` issue here.
   example) b1:4, b2:20, b3:20
   
   Here, still, b1 will be repeatedly selected until b1's load is updated to the leader, though all of these brokers should be candidates as their loads are low. (As we discussed, when many bundles are unloading in a short period, this might lead to b1 ending up overloaded, like b1:90%, b2:20, b3:20) 
   
   Can we do a high_load_threshold_filter and random selection?
   
   I think one could implement a usage-and-selection-weighted random selection to weigh more on the underloaded brokers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org