You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2016/01/29 03:53:46 UTC
hadoop git commit: YARN-4172. Extend DominantResourceCalculator to
account for all resources. (Varun Vasudev via wangda)
Repository: hadoop
Updated Branches:
refs/heads/YARN-3926 d328f70ec -> f404e0343
YARN-4172. Extend DominantResourceCalculator to account for all resources. (Varun Vasudev via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f404e034
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f404e034
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f404e034
Branch: refs/heads/YARN-3926
Commit: f404e0343422c38b6ac459e81dc855678c159596
Parents: d328f70
Author: Wangda Tan <wa...@apache.org>
Authored: Fri Jan 29 10:53:31 2016 +0800
Committer: Wangda Tan <wa...@apache.org>
Committed: Fri Jan 29 10:53:31 2016 +0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 15 +-
.../resource/DominantResourceCalculator.java | 373 ++++++++++++++-----
.../yarn/util/resource/ResourceCalculator.java | 15 +
3 files changed, 299 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f404e034/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 1ce1ffc..a4d4fb9 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -1,5 +1,17 @@
Hadoop YARN Change Log
+YARN-3926 - Unreleased
+
+ NEW FEATURES
+
+ YARN-4081. Add support for multiple resource types in the Resource
+ class. (Varun Vasudev via wangda)
+
+ IMPROVEMENTS
+
+ YARN-4172. Extend DominantResourceCalculator to account for all
+ resources. (Varun Vasudev via wangda)
+
Trunk - Unreleased
INCOMPATIBLE CHANGES
@@ -381,9 +393,6 @@ Release 2.8.0 - UNRELEASED
YARN-4265. Provide new timeline plugin storage to support fine-grained entity
caching. (Li Lu and Jason Lowe via junping_du)
- YARN-4081. Add support for multiple resource types in the Resource
- class. (Varun Vasudev via wangda)
-
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f404e034/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
index b5c9967..17598a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
@@ -20,57 +20,100 @@ package org.apache.hadoop.yarn.util.resource;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
+
+import java.util.HashSet;
+import java.util.Set;
/**
- * A {@link ResourceCalculator} which uses the concept of
+ * A {@link ResourceCalculator} which uses the concept of
* <em>dominant resource</em> to compare multi-dimensional resources.
*
- * Essentially the idea is that the in a multi-resource environment,
- * the resource allocation should be determined by the dominant share
- * of an entity (user or queue), which is the maximum share that the
- * entity has been allocated of any resource.
- *
- * In a nutshell, it seeks to maximize the minimum dominant share across
- * all entities.
- *
+ * Essentially the idea is that the in a multi-resource environment,
+ * the resource allocation should be determined by the dominant share
+ * of an entity (user or queue), which is the maximum share that the
+ * entity has been allocated of any resource.
+ *
+ * In a nutshell, it seeks to maximize the minimum dominant share across
+ * all entities.
+ *
* For example, if user A runs CPU-heavy tasks and user B runs
- * memory-heavy tasks, it attempts to equalize CPU share of user A
- * with Memory-share of user B.
- *
+ * memory-heavy tasks, it attempts to equalize CPU share of user A
+ * with Memory-share of user B.
+ *
* In the single resource case, it reduces to max-min fairness for that resource.
- *
+ *
* See the Dominant Resource Fairness paper for more details:
* www.cs.berkeley.edu/~matei/papers/2011/nsdi_drf.pdf
*/
@Private
@Unstable
public class DominantResourceCalculator extends ResourceCalculator {
-
+
+ private Set<String> resourceNames;
+
+ public DominantResourceCalculator() {
+ resourceNames = new HashSet<>();
+ resourceNames.add(ResourceInformation.MEMORY.getName());
+ resourceNames.add(ResourceInformation.VCORES.getName());
+ }
+
+ /**
+ * Compare two resources - if the value for every resource type for the lhs
+ * is greater than that of the rhs, return 1. If the value for every resource
+ * type in the lhs is less than the rhs, return -1. Otherwise, return 0
+ *
+ * @param lhs resource to be compared
+ * @param rhs resource to be compared
+ * @return 0, 1, or -1
+ */
+ private int compare(Resource lhs, Resource rhs) {
+ boolean lhsGreater = false;
+ boolean rhsGreater = false;
+ int ret = 0;
+
+ for (String rName : resourceNames) {
+ try {
+ ResourceInformation lhsResourceInformation =
+ lhs.getResourceInformation(rName);
+ ResourceInformation rhsResourceInformation =
+ rhs.getResourceInformation(rName);
+ int diff = lhsResourceInformation.compareTo(rhsResourceInformation);
+ if (diff >= 1) {
+ lhsGreater = true;
+ } else if (diff <= -1) {
+ rhsGreater = true;
+ }
+ } catch (YarnException ye) {
+ throw new IllegalArgumentException(
+ "Error getting resource information for " + rName, ye);
+ }
+ }
+ if (lhsGreater && rhsGreater) {
+ ret = 0;
+ } else if (lhsGreater) {
+ ret = 1;
+ } else if (rhsGreater) {
+ ret = -1;
+ }
+ return ret;
+ }
+
@Override
public int compare(Resource clusterResource, Resource lhs, Resource rhs) {
-
if (lhs.equals(rhs)) {
return 0;
}
-
+
if (isInvalidDivisor(clusterResource)) {
- if ((lhs.getMemory() < rhs.getMemory() && lhs.getVirtualCores() > rhs
- .getVirtualCores())
- || (lhs.getMemory() > rhs.getMemory() && lhs.getVirtualCores() < rhs
- .getVirtualCores())) {
- return 0;
- } else if (lhs.getMemory() > rhs.getMemory()
- || lhs.getVirtualCores() > rhs.getVirtualCores()) {
- return 1;
- } else if (lhs.getMemory() < rhs.getMemory()
- || lhs.getVirtualCores() < rhs.getVirtualCores()) {
- return -1;
- }
+ return this.compare(lhs, rhs);
}
float l = getResourceAsValue(clusterResource, lhs, true);
float r = getResourceAsValue(clusterResource, rhs, true);
-
+
if (l < r) {
return -1;
} else if (l > r) {
@@ -78,135 +121,263 @@ public class DominantResourceCalculator extends ResourceCalculator {
} else {
l = getResourceAsValue(clusterResource, lhs, false);
r = getResourceAsValue(clusterResource, rhs, false);
+
if (l < r) {
return -1;
} else if (l > r) {
return 1;
}
}
-
+
return 0;
}
/**
* Use 'dominant' for now since we only have 2 resources - gives us a slight
* performance boost.
- *
+ * <p></p>
* Once we add more resources, we'll need a more complicated (and slightly
* less performant algorithm).
*/
- protected float getResourceAsValue(
- Resource clusterResource, Resource resource, boolean dominant) {
- // Just use 'dominant' resource
- return (dominant) ?
- Math.max(
- (float)resource.getMemory() / clusterResource.getMemory(),
- (float)resource.getVirtualCores() / clusterResource.getVirtualCores()
- )
- :
- Math.min(
- (float)resource.getMemory() / clusterResource.getMemory(),
- (float)resource.getVirtualCores() / clusterResource.getVirtualCores()
- );
- }
-
+ protected float getResourceAsValue(Resource clusterResource,
+ Resource resource, boolean dominant) {
+
+ float min = Float.MAX_VALUE;
+ float max = 0.0f;
+ for (String rName : resourceNames) {
+ try {
+ ResourceInformation clusterResourceResourceInformation =
+ clusterResource.getResourceInformation(rName);
+ ResourceInformation resourceInformation =
+ resource.getResourceInformation(rName);
+ Long resourceValue = UnitsConversionUtil
+ .convert(resourceInformation.getUnits(),
+ clusterResourceResourceInformation.getUnits(),
+ resourceInformation.getValue());
+ float tmp =
+ (float) resourceValue / (float) clusterResourceResourceInformation
+ .getValue();
+ min = min < tmp ? min : tmp;
+ max = max > tmp ? max : tmp;
+ } catch (YarnException ye) {
+ throw new IllegalArgumentException(
+ "Error getting resource information for " + resource, ye);
+ }
+ }
+ return (dominant) ? max : min;
+ }
+
@Override
public int computeAvailableContainers(Resource available, Resource required) {
- return Math.min(
- available.getMemory() / required.getMemory(),
- available.getVirtualCores() / required.getVirtualCores());
+ long min = Long.MAX_VALUE;
+ for (String resource : resourceNames) {
+ try {
+ ResourceInformation availableResource =
+ available.getResourceInformation(resource);
+ ResourceInformation requiredResource =
+ required.getResourceInformation(resource);
+ Long requiredResourceValue = UnitsConversionUtil
+ .convert(requiredResource.getUnits(), availableResource.getUnits(),
+ requiredResource.getValue());
+ Long tmp = availableResource.getValue() / requiredResourceValue;
+ min = min < tmp ? min : tmp;
+ } catch (YarnException ye) {
+ throw new IllegalArgumentException(
+ "Error getting resource information for " + resource, ye);
+ }
+
+ }
+ return min > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) min;
}
@Override
- public float divide(Resource clusterResource,
+ public float divide(Resource clusterResource,
Resource numerator, Resource denominator) {
- return
- getResourceAsValue(clusterResource, numerator, true) /
+ return
+ getResourceAsValue(clusterResource, numerator, true) /
getResourceAsValue(clusterResource, denominator, true);
}
-
+
@Override
public boolean isInvalidDivisor(Resource r) {
- if (r.getMemory() == 0.0f || r.getVirtualCores() == 0.0f) {
- return true;
+ for (String resource : resourceNames) {
+ try {
+ if (r.getResourceValue(resource).equals(0L)) {
+ return true;
+ }
+ } catch (YarnException ye) {
+ throw new IllegalArgumentException(
+ "Error getting resource value for " + resource, ye);
+ }
}
return false;
}
@Override
public float ratio(Resource a, Resource b) {
- return Math.max(
- (float)a.getMemory()/b.getMemory(),
- (float)a.getVirtualCores()/b.getVirtualCores()
- );
+ float ratio = 0.0f;
+ for (String resource : resourceNames) {
+ try {
+ ResourceInformation aResourceInformation =
+ a.getResourceInformation(resource);
+ ResourceInformation bResourceInformation =
+ b.getResourceInformation(resource);
+ Long bResourceValue = UnitsConversionUtil
+ .convert(bResourceInformation.getUnits(),
+ aResourceInformation.getUnits(),
+ bResourceInformation.getValue());
+ float tmp =
+ (float) aResourceInformation.getValue() / (float) bResourceValue;
+ ratio = ratio > tmp ? ratio : tmp;
+ } catch (YarnException ye) {
+ throw new IllegalArgumentException(
+ "Error getting resource information for " + resource, ye);
+ }
+ }
+ return ratio;
}
@Override
public Resource divideAndCeil(Resource numerator, int denominator) {
- return Resources.createResource(
- divideAndCeil(numerator.getMemory(), denominator),
- divideAndCeil(numerator.getVirtualCores(), denominator)
- );
+ Resource ret = Resources.createResource(0, 0);
+ for (String resource : resourceNames) {
+ try {
+ ResourceInformation resourceInformation = ResourceInformation
+ .newInstance(numerator.getResourceInformation(resource));
+ resourceInformation.setValue(
+ divideAndCeil(resourceInformation.getValue(), denominator));
+ ret.setResourceInformation(resource, resourceInformation);
+ } catch (YarnException ye) {
+ throw new IllegalArgumentException(
+ "Error getting resource information for " + resource, ye);
+ }
+ }
+ return ret;
}
@Override
public Resource normalize(Resource r, Resource minimumResource,
- Resource maximumResource, Resource stepFactor) {
- int normalizedMemory = Math.min(
- roundUp(
- Math.max(r.getMemory(), minimumResource.getMemory()),
- stepFactor.getMemory()),
- maximumResource.getMemory());
- int normalizedCores = Math.min(
- roundUp(
- Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()),
- stepFactor.getVirtualCores()),
- maximumResource.getVirtualCores());
- return Resources.createResource(normalizedMemory,
- normalizedCores);
+ Resource maximumResource, Resource stepFactor) {
+ Resource ret = Resources.createResource(0, 0);
+ for (String resource : resourceNames) {
+ try {
+ ResourceInformation rResourceInformation =
+ r.getResourceInformation(resource);
+ ResourceInformation minimumResourceInformation =
+ minimumResource.getResourceInformation(resource);
+ ResourceInformation maximumResourceInformation =
+ maximumResource.getResourceInformation(resource);
+ ResourceInformation stepFactorResourceInformation =
+ stepFactor.getResourceInformation(resource);
+ ResourceInformation tmp =
+ ResourceInformation.newInstance(rResourceInformation);
+
+ Long rValue = rResourceInformation.getValue();
+ Long minimumValue = UnitsConversionUtil
+ .convert(minimumResourceInformation.getUnits(),
+ rResourceInformation.getUnits(),
+ minimumResourceInformation.getValue());
+ Long maximumValue = UnitsConversionUtil
+ .convert(maximumResourceInformation.getUnits(),
+ rResourceInformation.getUnits(),
+ maximumResourceInformation.getValue());
+ Long stepFactorValue = UnitsConversionUtil
+ .convert(stepFactorResourceInformation.getUnits(),
+ rResourceInformation.getUnits(),
+ stepFactorResourceInformation.getValue());
+
+ tmp.setValue(
+ Math.min(roundUp(Math.max(rValue, minimumValue), stepFactorValue),
+ maximumValue));
+ ret.setResourceInformation(resource, tmp);
+ } catch (YarnException ye) {
+ throw new IllegalArgumentException(
+ "Error getting resource information for " + resource, ye);
+ }
+ }
+ return ret;
}
@Override
public Resource roundUp(Resource r, Resource stepFactor) {
- return Resources.createResource(
- roundUp(r.getMemory(), stepFactor.getMemory()),
- roundUp(r.getVirtualCores(), stepFactor.getVirtualCores())
- );
+ return this.rounding(r, stepFactor, true);
}
@Override
public Resource roundDown(Resource r, Resource stepFactor) {
- return Resources.createResource(
- roundDown(r.getMemory(), stepFactor.getMemory()),
- roundDown(r.getVirtualCores(), stepFactor.getVirtualCores())
- );
+ return this.rounding(r, stepFactor, false);
+ }
+
+ private Resource rounding(Resource r, Resource stepFactor, boolean roundUp) {
+ Resource ret = Resources.createResource(0, 0);
+ for (String resource : resourceNames) {
+ try {
+ ResourceInformation rResourceInformation =
+ r.getResourceInformation(resource);
+ ResourceInformation stepFactorResourceInformation =
+ stepFactor.getResourceInformation(resource);
+ ResourceInformation tmp =
+ ResourceInformation.newInstance(rResourceInformation);
+
+ Long rValue = rResourceInformation.getValue();
+ Long stepFactorValue = UnitsConversionUtil
+ .convert(stepFactorResourceInformation.getUnits(),
+ rResourceInformation.getUnits(),
+ stepFactorResourceInformation.getValue());
+
+ Long value = roundUp ? roundUp(rValue, stepFactorValue) :
+ roundDown(rValue, stepFactorValue);
+ tmp.setValue(value);
+ ret.setResourceInformation(resource, tmp);
+ } catch (YarnException ye) {
+ throw new IllegalArgumentException(
+ "Error getting resource information for " + resource, ye);
+ }
+ }
+ return ret;
}
@Override
public Resource multiplyAndNormalizeUp(Resource r, double by,
Resource stepFactor) {
- return Resources.createResource(
- roundUp(
- (int)Math.ceil(r.getMemory() * by), stepFactor.getMemory()),
- roundUp(
- (int)Math.ceil(r.getVirtualCores() * by),
- stepFactor.getVirtualCores())
- );
+ return this.multiplyAndNormalize(r, by, stepFactor, true);
}
@Override
public Resource multiplyAndNormalizeDown(Resource r, double by,
Resource stepFactor) {
- return Resources.createResource(
- roundDown(
- (int)(r.getMemory() * by),
- stepFactor.getMemory()
- ),
- roundDown(
- (int)(r.getVirtualCores() * by),
- stepFactor.getVirtualCores()
- )
- );
+ return this.multiplyAndNormalize(r, by, stepFactor, false);
+ }
+
+ private Resource multiplyAndNormalize(Resource r, double by,
+ Resource stepFactor, boolean roundUp) {
+ Resource ret = Resources.createResource(0, 0);
+ for (String resource : resourceNames) {
+ try {
+ ResourceInformation rResourceInformation =
+ r.getResourceInformation(resource);
+ ResourceInformation stepFactorResourceInformation =
+ stepFactor.getResourceInformation(resource);
+ ResourceInformation tmp =
+ ResourceInformation.newInstance(rResourceInformation);
+
+ Long rValue = rResourceInformation.getValue();
+ Long stepFactorValue = UnitsConversionUtil
+ .convert(stepFactorResourceInformation.getUnits(),
+ rResourceInformation.getUnits(),
+ stepFactorResourceInformation.getValue());
+
+ Long value =
+ roundUp ? roundUp((long) Math.ceil(rValue * by), stepFactorValue) :
+ roundDown((long) (rValue * by), stepFactorValue);
+ tmp.setValue(value);
+ ret.setResourceInformation(resource, tmp);
+ } catch (YarnException ye) {
+ throw new IllegalArgumentException(
+ "Error getting resource information for " + resource, ye);
+ }
+ }
+ return ret;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f404e034/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
index 3a31225..17e768e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
@@ -38,14 +38,29 @@ public abstract class ResourceCalculator {
return (a + (b - 1)) / b;
}
+ public static long divideAndCeil(long a, long b) {
+ if (b == 0) {
+ return 0;
+ }
+ return (a + (b - 1)) / b;
+ }
+
public static int roundUp(int a, int b) {
return divideAndCeil(a, b) * b;
}
+ public static long roundUp(long a, long b) {
+ return divideAndCeil(a, b) * b;
+ }
+
public static int roundDown(int a, int b) {
return (a / b) * b;
}
+ public static long roundDown(long a, long b) {
+ return (a / b) * b;
+ }
+
/**
* Compute the number of containers which can be allocated given
* <code>available</code> and <code>required</code> resources.