You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aa...@apache.org on 2022/05/13 16:12:02 UTC

[hadoop] branch trunk updated: YARN-11073. Avoid unnecessary preemption for tiny queues under certain corner cases (#4110)

This is an automated email from the ASF dual-hosted git repository.

aajisaka pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d2c9eb653a5 YARN-11073. Avoid unnecessary preemption for tiny queues under certain corner cases (#4110)
d2c9eb653a5 is described below

commit d2c9eb653a5bbb0903a86c26fbb8bf330858d26b
Author: Jian Chen <jc...@users.noreply.github.com>
AuthorDate: Fri May 13 09:11:42 2022 -0700

    YARN-11073. Avoid unnecessary preemption for tiny queues under certain corner cases (#4110)
    
    Co-authored-by: Jian Chen <ji...@airbnb.com>
    Signed-off-by: Akira Ajisaka <aa...@apache.org>
---
 .../AbstractPreemptableResourceCalculator.java     | 173 ++++++++++++++++++---
 .../monitor/capacity/TempQueuePerPartition.java    |   4 +
 2 files changed, 156 insertions(+), 21 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
index c8f68a26a07..c3c594512b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
@@ -31,12 +31,16 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.PriorityQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Calculate how much resources need to be preempted for each queue,
  * will be used by {@link PreemptionCandidatesSelector}.
  */
 public class AbstractPreemptableResourceCalculator {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbstractPreemptableResourceCalculator.class);
 
   protected final CapacitySchedulerPreemptionContext context;
   protected final ResourceCalculator rc;
@@ -76,6 +80,34 @@ public class AbstractPreemptableResourceCalculator {
     }
   }
 
+  private static class NormalizationTuple {
+    private Resource numerator;
+    private Resource denominator;
+
+    NormalizationTuple(Resource numer, Resource denom) {
+      this.numerator = numer;
+      this.denominator = denom;
+    }
+
+    long getNumeratorValue(int i) {
+      return numerator.getResourceInformation(i).getValue();
+    }
+
+    long getDenominatorValue(int i) {
+      String nUnits = numerator.getResourceInformation(i).getUnits();
+      ResourceInformation dResourceInformation = denominator
+          .getResourceInformation(i);
+      return UnitsConversionUtil.convert(
+          dResourceInformation.getUnits(), nUnits, dResourceInformation.getValue());
+    }
+
+    float getNormalizedValue(int i) {
+      long nValue = getNumeratorValue(i);
+      long dValue = getDenominatorValue(i);
+      return dValue == 0 ? 0.0f : (float) nValue / dValue;
+    }
+  }
+
   /**
    * PreemptableResourceCalculator constructor.
    *
@@ -175,7 +207,7 @@ public class AbstractPreemptableResourceCalculator {
         unassigned, Resources.none())) {
       // we compute normalizedGuarantees capacity based on currently active
       // queues
-      resetCapacity(unassigned, orderedByNeed, ignoreGuarantee);
+      resetCapacity(orderedByNeed, ignoreGuarantee);
 
       // For each underserved queue (or set of queues if multiple are equally
       // underserved), offer its share of the unassigned resources based on its
@@ -252,47 +284,146 @@ public class AbstractPreemptableResourceCalculator {
   /**
    * Computes a normalizedGuaranteed capacity based on active queues.
    *
-   * @param clusterResource
-   *          the total amount of resources in the cluster
    * @param queues
    *          the list of queues to consider
    * @param ignoreGuar
    *          ignore guarantee.
    */
-  private void resetCapacity(Resource clusterResource,
-      Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
+  private void resetCapacity(Collection<TempQueuePerPartition> queues,
+                             boolean ignoreGuar) {
     Resource activeCap = Resource.newInstance(0, 0);
+    float activeTotalAbsCap = 0.0f;
     int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
 
     if (ignoreGuar) {
-      for (TempQueuePerPartition q : queues) {
-        for (int i = 0; i < maxLength; i++) {
-          q.normalizedGuarantee[i] = 1.0f / queues.size();
+      for (int i = 0; i < maxLength; i++) {
+        for (TempQueuePerPartition q : queues) {
+          computeNormGuarEvenly(q, queues.size(), i);
         }
       }
     } else {
       for (TempQueuePerPartition q : queues) {
         Resources.addTo(activeCap, q.getGuaranteed());
+        activeTotalAbsCap += q.getAbsCapacity();
       }
-      for (TempQueuePerPartition q : queues) {
-        for (int i = 0; i < maxLength; i++) {
-          ResourceInformation nResourceInformation = q.getGuaranteed()
-              .getResourceInformation(i);
-          ResourceInformation dResourceInformation = activeCap
-              .getResourceInformation(i);
-
-          long nValue = nResourceInformation.getValue();
-          long dValue = UnitsConversionUtil.convert(
-              dResourceInformation.getUnits(), nResourceInformation.getUnits(),
-              dResourceInformation.getValue());
-          if (dValue != 0) {
-            q.normalizedGuarantee[i] = (float) nValue / dValue;
+
+      // loop through all resource types and normalize guaranteed capacity for all queues
+      for (int i = 0; i < maxLength; i++) {
+        boolean useAbsCapBasedNorm = false;
+        // if the sum of absolute capacity of all queues involved is 0,
+        // we should normalize evenly
+        boolean useEvenlyDistNorm = activeTotalAbsCap == 0;
+
+        // loop through all the queues once to determine the
+        // right normalization strategy for current processing resource type
+        for (TempQueuePerPartition q : queues) {
+          NormalizationTuple normTuple = new NormalizationTuple(
+              q.getGuaranteed(), activeCap);
+          long queueGuaranValue = normTuple.getNumeratorValue(i);
+          long totalActiveGuaranValue = normTuple.getDenominatorValue(i);
+
+          if (queueGuaranValue == 0 && q.getAbsCapacity() != 0 && totalActiveGuaranValue != 0) {
+            // when the rounded value of a resource type is 0 but its absolute capacity is not 0,
+            // we should consider taking the normalized guarantee based on absolute capacity
+            useAbsCapBasedNorm = true;
+            break;
+          }
+
+          if (totalActiveGuaranValue == 0) {
+            // If totalActiveGuaranValue from activeCap is zero, that means the guaranteed capacity
+            // of this resource dimension for all active queues is tiny (close to 0).
+            // For example, if a queue has 1% of minCapacity on a cluster with a totalVcores of 48,
+            // then the idealAssigned Vcores for this queue is (48 * 0.01)=0.48 which then
+            // get rounded/casted into 0 (double -> long)
+            // In this scenario where the denominator is 0, we can just spread resources across
+            // all tiny queues evenly since their absoluteCapacity are roughly the same
+            useEvenlyDistNorm = true;
+          }
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Queue normalization strategy: " +
+              "absoluteCapacityBasedNormalization(" + useAbsCapBasedNorm +
+              "), evenlyDistributedNormalization(" + useEvenlyDistNorm +
+              "), defaultNormalization(" + !(useAbsCapBasedNorm || useEvenlyDistNorm) + ")");
+        }
+
+        // loop through all the queues again to apply normalization strategy
+        for (TempQueuePerPartition q : queues) {
+          if (useAbsCapBasedNorm) {
+            computeNormGuarFromAbsCapacity(q, activeTotalAbsCap, i);
+          } else if (useEvenlyDistNorm) {
+            computeNormGuarEvenly(q, queues.size(), i);
+          } else {
+            computeDefaultNormGuar(q, activeCap, i);
           }
         }
       }
     }
   }
 
+  /**
+   * Computes the normalized guaranteed capacity based on the weight of a queue's abs capacity.
+   *
+   * Example:
+   *  There are two active queues: queueA & queueB, and
+   *  their configured absolute minimum capacity is 1% and 3% respectively.
+   *
+   *  Then their normalized guaranteed capacity are:
+   *    normalized_guar_queueA = 0.01 / (0.01 + 0.03) = 0.25
+   *    normalized_guar_queueB = 0.03 / (0.01 + 0.03) = 0.75
+   *
+   * @param q
+   *          the queue to consider
+   * @param activeTotalAbsCap
+   *          the sum of absolute capacity of all active queues
+   * @param resourceTypeIdx
+   *          index of the processing resource type
+   */
+  private static void computeNormGuarFromAbsCapacity(TempQueuePerPartition q,
+                                                     float activeTotalAbsCap,
+                                                     int resourceTypeIdx) {
+    if (activeTotalAbsCap != 0) {
+      q.normalizedGuarantee[resourceTypeIdx] = q.getAbsCapacity() / activeTotalAbsCap;
+    }
+  }
+
+  /**
+   * Computes the normalized guaranteed capacity evenly based on num of active queues.
+   *
+   * @param q
+   *          the queue to consider
+   * @param numOfActiveQueues
+   *          number of active queues
+   * @param resourceTypeIdx
+   *          index of the processing resource type
+   */
+  private static void computeNormGuarEvenly(TempQueuePerPartition q,
+                                            int numOfActiveQueues,
+                                            int resourceTypeIdx) {
+    q.normalizedGuarantee[resourceTypeIdx] = 1.0f / numOfActiveQueues;
+  }
+
+  /**
+   * The default way to compute a queue's normalized guaranteed capacity.
+   *
+   * For each resource type, divide a queue's configured guaranteed amount (MBs/Vcores) by
+   * the total amount of guaranteed resource of all active queues
+   *
+   * @param q
+   *          the queue to consider
+   * @param activeCap
+   *          total guaranteed resources of all active queues
+   * @param resourceTypeIdx
+   *          index of the processing resource type
+   */
+  private static void computeDefaultNormGuar(TempQueuePerPartition q,
+                                             Resource activeCap,
+                                             int resourceTypeIdx) {
+    NormalizationTuple normTuple = new NormalizationTuple(q.getGuaranteed(), activeCap);
+    q.normalizedGuarantee[resourceTypeIdx] = normTuple.getNormalizedValue(resourceTypeIdx);
+  }
+
   // Take the most underserved TempQueue (the one on the head). Collect and
   // return the list of all queues that have the same idealAssigned
   // percentage of guaranteed.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 958c08e8038..78075bb5c17 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -201,6 +201,10 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
     return remain;
   }
 
+  public float getAbsCapacity() {
+    return absCapacity;
+  }
+
   public Resource getGuaranteed() {
     if(!effMinRes.equals(Resources.none())) {
       return Resources.clone(effMinRes);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org