You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aa...@apache.org on 2022/05/13 16:12:02 UTC
[hadoop] branch trunk updated: YARN-11073. Avoid unnecessary preemption for tiny queues under certain corner cases (#4110)
This is an automated email from the ASF dual-hosted git repository.
aajisaka pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new d2c9eb653a5 YARN-11073. Avoid unnecessary preemption for tiny queues under certain corner cases (#4110)
d2c9eb653a5 is described below
commit d2c9eb653a5bbb0903a86c26fbb8bf330858d26b
Author: Jian Chen <jc...@users.noreply.github.com>
AuthorDate: Fri May 13 09:11:42 2022 -0700
YARN-11073. Avoid unnecessary preemption for tiny queues under certain corner cases (#4110)
Co-authored-by: Jian Chen <ji...@airbnb.com>
Signed-off-by: Akira Ajisaka <aa...@apache.org>
---
.../AbstractPreemptableResourceCalculator.java | 173 ++++++++++++++++++---
.../monitor/capacity/TempQueuePerPartition.java | 4 +
2 files changed, 156 insertions(+), 21 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
index c8f68a26a07..c3c594512b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
@@ -31,12 +31,16 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.PriorityQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Calculate how much resources need to be preempted for each queue,
* will be used by {@link PreemptionCandidatesSelector}.
*/
public class AbstractPreemptableResourceCalculator {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbstractPreemptableResourceCalculator.class);
protected final CapacitySchedulerPreemptionContext context;
protected final ResourceCalculator rc;
@@ -76,6 +80,34 @@ public class AbstractPreemptableResourceCalculator {
}
}
+ private static class NormalizationTuple {
+ private Resource numerator;
+ private Resource denominator;
+
+ NormalizationTuple(Resource numer, Resource denom) {
+ this.numerator = numer;
+ this.denominator = denom;
+ }
+
+ long getNumeratorValue(int i) {
+ return numerator.getResourceInformation(i).getValue();
+ }
+
+ long getDenominatorValue(int i) {
+ String nUnits = numerator.getResourceInformation(i).getUnits();
+ ResourceInformation dResourceInformation = denominator
+ .getResourceInformation(i);
+ return UnitsConversionUtil.convert(
+ dResourceInformation.getUnits(), nUnits, dResourceInformation.getValue());
+ }
+
+ float getNormalizedValue(int i) {
+ long nValue = getNumeratorValue(i);
+ long dValue = getDenominatorValue(i);
+ return dValue == 0 ? 0.0f : (float) nValue / dValue;
+ }
+ }
+
/**
* PreemptableResourceCalculator constructor.
*
@@ -175,7 +207,7 @@ public class AbstractPreemptableResourceCalculator {
unassigned, Resources.none())) {
// we compute normalizedGuarantees capacity based on currently active
// queues
- resetCapacity(unassigned, orderedByNeed, ignoreGuarantee);
+ resetCapacity(orderedByNeed, ignoreGuarantee);
// For each underserved queue (or set of queues if multiple are equally
// underserved), offer its share of the unassigned resources based on its
@@ -252,47 +284,146 @@ public class AbstractPreemptableResourceCalculator {
/**
* Computes a normalizedGuaranteed capacity based on active queues.
*
- * @param clusterResource
- * the total amount of resources in the cluster
* @param queues
* the list of queues to consider
* @param ignoreGuar
* ignore guarantee.
*/
- private void resetCapacity(Resource clusterResource,
- Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
+ private void resetCapacity(Collection<TempQueuePerPartition> queues,
+ boolean ignoreGuar) {
Resource activeCap = Resource.newInstance(0, 0);
+ float activeTotalAbsCap = 0.0f;
int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
if (ignoreGuar) {
- for (TempQueuePerPartition q : queues) {
- for (int i = 0; i < maxLength; i++) {
- q.normalizedGuarantee[i] = 1.0f / queues.size();
+ for (int i = 0; i < maxLength; i++) {
+ for (TempQueuePerPartition q : queues) {
+ computeNormGuarEvenly(q, queues.size(), i);
}
}
} else {
for (TempQueuePerPartition q : queues) {
Resources.addTo(activeCap, q.getGuaranteed());
+ activeTotalAbsCap += q.getAbsCapacity();
}
- for (TempQueuePerPartition q : queues) {
- for (int i = 0; i < maxLength; i++) {
- ResourceInformation nResourceInformation = q.getGuaranteed()
- .getResourceInformation(i);
- ResourceInformation dResourceInformation = activeCap
- .getResourceInformation(i);
-
- long nValue = nResourceInformation.getValue();
- long dValue = UnitsConversionUtil.convert(
- dResourceInformation.getUnits(), nResourceInformation.getUnits(),
- dResourceInformation.getValue());
- if (dValue != 0) {
- q.normalizedGuarantee[i] = (float) nValue / dValue;
+
+ // loop through all resource types and normalize guaranteed capacity for all queues
+ for (int i = 0; i < maxLength; i++) {
+ boolean useAbsCapBasedNorm = false;
+ // if the sum of absolute capacity of all queues involved is 0,
+ // we should normalize evenly
+ boolean useEvenlyDistNorm = activeTotalAbsCap == 0;
+
+ // loop through all the queues once to determine the
+ // right normalization strategy for current processing resource type
+ for (TempQueuePerPartition q : queues) {
+ NormalizationTuple normTuple = new NormalizationTuple(
+ q.getGuaranteed(), activeCap);
+ long queueGuaranValue = normTuple.getNumeratorValue(i);
+ long totalActiveGuaranValue = normTuple.getDenominatorValue(i);
+
+ if (queueGuaranValue == 0 && q.getAbsCapacity() != 0 && totalActiveGuaranValue != 0) {
+ // when the rounded value of a resource type is 0 but its absolute capacity is not 0,
+ // we should consider taking the normalized guarantee based on absolute capacity
+ useAbsCapBasedNorm = true;
+ break;
+ }
+
+ if (totalActiveGuaranValue == 0) {
+ // If totalActiveGuaranValue from activeCap is zero, that means the guaranteed capacity
+ // of this resource dimension for all active queues is tiny (close to 0).
+ // For example, if a queue has 1% of minCapacity on a cluster with a totalVcores of 48,
+ // then the idealAssigned Vcores for this queue is (48 * 0.01)=0.48 which then
+ // get rounded/casted into 0 (double -> long)
+ // In this scenario where the denominator is 0, we can just spread resources across
+ // all tiny queues evenly since their absoluteCapacity are roughly the same
+ useEvenlyDistNorm = true;
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Queue normalization strategy: " +
+ "absoluteCapacityBasedNormalization(" + useAbsCapBasedNorm +
+ "), evenlyDistributedNormalization(" + useEvenlyDistNorm +
+ "), defaultNormalization(" + !(useAbsCapBasedNorm || useEvenlyDistNorm) + ")");
+ }
+
+ // loop through all the queues again to apply normalization strategy
+ for (TempQueuePerPartition q : queues) {
+ if (useAbsCapBasedNorm) {
+ computeNormGuarFromAbsCapacity(q, activeTotalAbsCap, i);
+ } else if (useEvenlyDistNorm) {
+ computeNormGuarEvenly(q, queues.size(), i);
+ } else {
+ computeDefaultNormGuar(q, activeCap, i);
}
}
}
}
}
+ /**
+ * Computes the normalized guaranteed capacity based on the weight of a queue's abs capacity.
+ *
+ * Example:
+ * There are two active queues: queueA & queueB, and
+ * their configured absolute minimum capacity is 1% and 3% respectively.
+ *
+ * Then their normalized guaranteed capacity are:
+ * normalized_guar_queueA = 0.01 / (0.01 + 0.03) = 0.25
+ * normalized_guar_queueB = 0.03 / (0.01 + 0.03) = 0.75
+ *
+ * @param q
+ * the queue to consider
+ * @param activeTotalAbsCap
+ * the sum of absolute capacity of all active queues
+ * @param resourceTypeIdx
+ * index of the processing resource type
+ */
+ private static void computeNormGuarFromAbsCapacity(TempQueuePerPartition q,
+ float activeTotalAbsCap,
+ int resourceTypeIdx) {
+ if (activeTotalAbsCap != 0) {
+ q.normalizedGuarantee[resourceTypeIdx] = q.getAbsCapacity() / activeTotalAbsCap;
+ }
+ }
+
+ /**
+ * Computes the normalized guaranteed capacity evenly based on num of active queues.
+ *
+ * @param q
+ * the queue to consider
+ * @param numOfActiveQueues
+ * number of active queues
+ * @param resourceTypeIdx
+ * index of the processing resource type
+ */
+ private static void computeNormGuarEvenly(TempQueuePerPartition q,
+ int numOfActiveQueues,
+ int resourceTypeIdx) {
+ q.normalizedGuarantee[resourceTypeIdx] = 1.0f / numOfActiveQueues;
+ }
+
+ /**
+ * The default way to compute a queue's normalized guaranteed capacity.
+ *
+ * For each resource type, divide a queue's configured guaranteed amount (MBs/Vcores) by
+ * the total amount of guaranteed resource of all active queues
+ *
+ * @param q
+ * the queue to consider
+ * @param activeCap
+ * total guaranteed resources of all active queues
+ * @param resourceTypeIdx
+ * index of the processing resource type
+ */
+ private static void computeDefaultNormGuar(TempQueuePerPartition q,
+ Resource activeCap,
+ int resourceTypeIdx) {
+ NormalizationTuple normTuple = new NormalizationTuple(q.getGuaranteed(), activeCap);
+ q.normalizedGuarantee[resourceTypeIdx] = normTuple.getNormalizedValue(resourceTypeIdx);
+ }
+
// Take the most underserved TempQueue (the one on the head). Collect and
// return the list of all queues that have the same idealAssigned
// percentage of guaranteed.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 958c08e8038..78075bb5c17 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -201,6 +201,10 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
return remain;
}
+ public float getAbsCapacity() {
+ return absCapacity;
+ }
+
public Resource getGuaranteed() {
if(!effMinRes.equals(Resources.none())) {
return Resources.clone(effMinRes);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org