You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2017/08/11 17:32:30 UTC
[50/50] [abbrv] hadoop git commit: YARN-6471. Support to add min/max
resource configuration for a queue. (Sunil G via wangda)
YARN-6471. Support to add min/max resource configuration for a queue. (Sunil G via wangda)
Change-Id: I9213f5297a6841fab5c573e85ee4c4e5f4a0b7ff
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/95a81934
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/95a81934
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/95a81934
Branch: refs/heads/YARN-5881
Commit: 95a81934385a1a0f404930b8075e2a066fc6c413
Parents: 4222c97
Author: Wangda Tan <wa...@apache.org>
Authored: Fri Aug 11 10:30:23 2017 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Fri Aug 11 10:30:23 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/util/StringUtils.java | 31 ++
.../hadoop/yarn/util/UnitsConversionUtil.java | 217 ++++++++
.../resource/DefaultResourceCalculator.java | 6 +
.../resource/DominantResourceCalculator.java | 7 +
.../yarn/util/resource/ResourceCalculator.java | 12 +
.../hadoop/yarn/util/resource/Resources.java | 5 +
.../capacity/FifoCandidatesSelector.java | 9 +-
.../ProportionalCapacityPreemptionPolicy.java | 10 +-
.../monitor/capacity/TempQueuePerPartition.java | 16 +-
.../scheduler/AbstractResourceUsage.java | 198 +++++++
.../scheduler/QueueResourceQuotas.java | 153 ++++++
.../scheduler/ResourceUsage.java | 237 ++-------
.../scheduler/capacity/AbstractCSQueue.java | 162 +++++-
.../scheduler/capacity/CSQueue.java | 42 +-
.../scheduler/capacity/CSQueueUtils.java | 24 +-
.../CapacitySchedulerConfiguration.java | 179 ++++++-
.../scheduler/capacity/LeafQueue.java | 31 +-
.../scheduler/capacity/ParentQueue.java | 203 +++++++-
.../scheduler/capacity/UsersManager.java | 5 +-
.../PriorityUtilizationQueueOrderingPolicy.java | 11 +
.../webapp/dao/CapacitySchedulerQueueInfo.java | 15 +
.../yarn/server/resourcemanager/MockNM.java | 8 +
.../yarn/server/resourcemanager/MockRM.java | 6 +
...alCapacityPreemptionPolicyMockFramework.java | 13 +
...estProportionalCapacityPreemptionPolicy.java | 29 +-
...pacityPreemptionPolicyIntraQueueWithDRF.java | 6 +-
.../TestAbsoluteResourceConfiguration.java | 516 +++++++++++++++++++
.../capacity/TestApplicationLimits.java | 30 +-
.../TestApplicationLimitsByPartition.java | 4 +
.../capacity/TestCapacityScheduler.java | 2 +-
.../scheduler/capacity/TestChildQueueOrder.java | 2 +
.../scheduler/capacity/TestLeafQueue.java | 261 ++++------
.../scheduler/capacity/TestParentQueue.java | 8 +
.../scheduler/capacity/TestReservations.java | 17 +
...tPriorityUtilizationQueueOrderingPolicy.java | 3 +
.../webapp/TestRMWebServicesCapacitySched.java | 4 +-
36 files changed, 2046 insertions(+), 436 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
index cda5ec7..1be8a08 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
@@ -1152,4 +1152,35 @@ public class StringUtils {
return s1.equalsIgnoreCase(s2);
}
+ /**
+ * <p>Checks if the String contains only unicode letters.</p>
+ *
+ * <p><code>null</code> will return <code>false</code>.
+ * An empty String (length()=0) will return <code>true</code>.</p>
+ *
+ * <pre>
+ * StringUtils.isAlpha(null) = false
+ * StringUtils.isAlpha("") = true
+ * StringUtils.isAlpha(" ") = false
+ * StringUtils.isAlpha("abc") = true
+ * StringUtils.isAlpha("ab2c") = false
+ * StringUtils.isAlpha("ab-c") = false
+ * </pre>
+ *
+ * @param str the String to check, may be null
+ * @return <code>true</code> if only contains letters, and is non-null
+ */
+ public static boolean isAlpha(String str) {
+ if (str == null) {
+ return false;
+ }
+ int sz = str.length();
+ for (int i = 0; i < sz; i++) {
+ if (Character.isLetter(str.charAt(i)) == false) {
+ return false;
+ }
+ }
+ return true;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
new file mode 100644
index 0000000..79ee0f7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.math.BigInteger;
+import java.util.*;
+
+/**
+ * A util to convert values in one unit to another. Units refers to whether
+ * the value is expressed in pico, nano, etc.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class UnitsConversionUtil {
+
+ /**
+ * Helper class for encapsulating conversion values.
+ */
+ public static class Converter {
+ private long numerator;
+ private long denominator;
+
+ Converter(long n, long d) {
+ this.numerator = n;
+ this.denominator = d;
+ }
+ }
+
+ private static final String[] UNITS =
+ { "p", "n", "u", "m", "", "k", "M", "G", "T", "P", "Ki", "Mi", "Gi", "Ti",
+ "Pi" };
+ private static final List<String> SORTED_UNITS = Arrays.asList(UNITS);
+ public static final Set<String> KNOWN_UNITS = createKnownUnitsSet();
+ private static final Converter PICO =
+ new Converter(1L, 1000L * 1000L * 1000L * 1000L);
+ private static final Converter NANO =
+ new Converter(1L, 1000L * 1000L * 1000L);
+ private static final Converter MICRO = new Converter(1L, 1000L * 1000L);
+ private static final Converter MILLI = new Converter(1L, 1000L);
+ private static final Converter BASE = new Converter(1L, 1L);
+ private static final Converter KILO = new Converter(1000L, 1L);
+ private static final Converter MEGA = new Converter(1000L * 1000L, 1L);
+ private static final Converter GIGA =
+ new Converter(1000L * 1000L * 1000L, 1L);
+ private static final Converter TERA =
+ new Converter(1000L * 1000L * 1000L * 1000L, 1L);
+ private static final Converter PETA =
+ new Converter(1000L * 1000L * 1000L * 1000L * 1000L, 1L);
+
+ private static final Converter KILO_BINARY = new Converter(1024L, 1L);
+ private static final Converter MEGA_BINARY = new Converter(1024L * 1024L, 1L);
+ private static final Converter GIGA_BINARY =
+ new Converter(1024L * 1024L * 1024L, 1L);
+ private static final Converter TERA_BINARY =
+ new Converter(1024L * 1024L * 1024L * 1024L, 1L);
+ private static final Converter PETA_BINARY =
+ new Converter(1024L * 1024L * 1024L * 1024L * 1024L, 1L);
+
+ private static Set<String> createKnownUnitsSet() {
+ Set<String> ret = new HashSet<>();
+ ret.addAll(Arrays.asList(UNITS));
+ return ret;
+ }
+
+ private static Converter getConverter(String unit) {
+ switch (unit) {
+ case "p":
+ return PICO;
+ case "n":
+ return NANO;
+ case "u":
+ return MICRO;
+ case "m":
+ return MILLI;
+ case "":
+ return BASE;
+ case "k":
+ return KILO;
+ case "M":
+ return MEGA;
+ case "G":
+ return GIGA;
+ case "T":
+ return TERA;
+ case "P":
+ return PETA;
+ case "Ki":
+ return KILO_BINARY;
+ case "Mi":
+ return MEGA_BINARY;
+ case "Gi":
+ return GIGA_BINARY;
+ case "Ti":
+ return TERA_BINARY;
+ case "Pi":
+ return PETA_BINARY;
+ default:
+ throw new IllegalArgumentException(
+ "Unknown unit '" + unit + "'. Known units are " + KNOWN_UNITS);
+ }
+ }
+
+ /**
+ * Converts a value from one unit to another. Supported units can be obtained
+ * by inspecting the KNOWN_UNITS set.
+ *
+ * @param fromUnit the unit of the from value
+ * @param toUnit the target unit
+ * @param fromValue the value you wish to convert
+ * @return the value in toUnit
+ */
+ public static Long convert(String fromUnit, String toUnit, Long fromValue) {
+ if (toUnit == null || fromUnit == null || fromValue == null) {
+ throw new IllegalArgumentException("One or more arguments are null");
+ }
+ String overflowMsg =
+ "Converting " + fromValue + " from '" + fromUnit + "' to '" + toUnit
+ + "' will result in an overflow of Long";
+ if (fromUnit.equals(toUnit)) {
+ return fromValue;
+ }
+ Converter fc = getConverter(fromUnit);
+ Converter tc = getConverter(toUnit);
+ Long numerator = fc.numerator * tc.denominator;
+ Long denominator = fc.denominator * tc.numerator;
+ Long numeratorMultiplierLimit = Long.MAX_VALUE / numerator;
+ if (numerator < denominator) {
+ if (numeratorMultiplierLimit < fromValue) {
+ throw new IllegalArgumentException(overflowMsg);
+ }
+ return (fromValue * numerator) / denominator;
+ }
+ if (numeratorMultiplierLimit > fromValue) {
+ return (numerator * fromValue) / denominator;
+ }
+ Long tmp = numerator / denominator;
+ if ((Long.MAX_VALUE / tmp) < fromValue) {
+ throw new IllegalArgumentException(overflowMsg);
+ }
+ return fromValue * tmp;
+ }
+
+ /**
+ * Compare a value in a given unit with a value in another unit. The return
+ * value is equivalent to the value returned by compareTo.
+ *
+ * @param unitA first unit
+ * @param valueA first value
+ * @param unitB second unit
+ * @param valueB second value
+ * @return +1, 0 or -1 depending on whether the relationship is greater than,
+ * equal to or lesser than
+ */
+ public static int compare(String unitA, Long valueA, String unitB,
+ Long valueB) {
+ if (unitA == null || unitB == null || !KNOWN_UNITS.contains(unitA)
+ || !KNOWN_UNITS.contains(unitB)) {
+ throw new IllegalArgumentException("Units cannot be null");
+ }
+ if (!KNOWN_UNITS.contains(unitA)) {
+ throw new IllegalArgumentException("Unknown unit '" + unitA + "'");
+ }
+ if (!KNOWN_UNITS.contains(unitB)) {
+ throw new IllegalArgumentException("Unknown unit '" + unitB + "'");
+ }
+ Converter unitAC = getConverter(unitA);
+ Converter unitBC = getConverter(unitB);
+ if (unitA.equals(unitB)) {
+ return valueA.compareTo(valueB);
+ }
+ int unitAPos = SORTED_UNITS.indexOf(unitA);
+ int unitBPos = SORTED_UNITS.indexOf(unitB);
+ try {
+ Long tmpA = valueA;
+ Long tmpB = valueB;
+ if (unitAPos < unitBPos) {
+ tmpB = convert(unitB, unitA, valueB);
+ } else {
+ tmpA = convert(unitA, unitB, valueA);
+ }
+ return tmpA.compareTo(tmpB);
+ } catch (IllegalArgumentException ie) {
+ BigInteger tmpA = BigInteger.valueOf(valueA);
+ BigInteger tmpB = BigInteger.valueOf(valueB);
+ if (unitAPos < unitBPos) {
+ tmpB = tmpB.multiply(BigInteger.valueOf(unitBC.numerator));
+ tmpB = tmpB.multiply(BigInteger.valueOf(unitAC.denominator));
+ tmpB = tmpB.divide(BigInteger.valueOf(unitBC.denominator));
+ tmpB = tmpB.divide(BigInteger.valueOf(unitAC.numerator));
+ } else {
+ tmpA = tmpA.multiply(BigInteger.valueOf(unitAC.numerator));
+ tmpA = tmpA.multiply(BigInteger.valueOf(unitBC.denominator));
+ tmpA = tmpA.divide(BigInteger.valueOf(unitAC.denominator));
+ tmpA = tmpA.divide(BigInteger.valueOf(unitBC.numerator));
+ }
+ return tmpA.compareTo(tmpB);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
index bdf60bd..764deaa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
@@ -132,4 +132,10 @@ public class DefaultResourceCalculator extends ResourceCalculator {
public boolean isAnyMajorResourceZero(Resource resource) {
return resource.getMemorySize() == 0f;
}
+
+ @Override
+ public Resource normalizeDown(Resource r, Resource stepFactor) {
+ return Resources.createResource(
+ roundDown((r.getMemorySize()), stepFactor.getMemorySize()));
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
index 7697e1d..05ddb41 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
@@ -244,4 +244,11 @@ public class DominantResourceCalculator extends ResourceCalculator {
public boolean isAnyMajorResourceZero(Resource resource) {
return resource.getMemorySize() == 0f || resource.getVirtualCores() == 0;
}
+
+ @Override
+ public Resource normalizeDown(Resource r, Resource stepFactor) {
+ return Resources.createResource(
+ roundDown(r.getMemorySize(), stepFactor.getMemorySize()),
+ roundDown(r.getVirtualCores(), stepFactor.getVirtualCores()));
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
index 398dac5..013b723 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
@@ -236,4 +236,16 @@ public abstract class ResourceCalculator {
* @return returns true if any resource is zero.
*/
public abstract boolean isAnyMajorResourceZero(Resource resource);
+
+ /**
+ * Get resource <code>r</code>and normalize down using step-factor
+ * <code>stepFactor</code>.
+ *
+ * @param r
+ * resource to be multiplied
+ * @param stepFactor
+ * factor by which to normalize down
+ * @return resulting normalized resource
+ */
+ public abstract Resource normalizeDown(Resource r, Resource stepFactor);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index a1d14fd..3972ec2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -355,4 +355,9 @@ public class Resources {
Resource resource) {
return rc.isAnyMajorResourceZero(resource);
}
+
+ public static Resource normalizeDown(ResourceCalculator calculator,
+ Resource resource, Resource factor) {
+ return calculator.normalizeDown(resource, factor);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
index f843db4..748548a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
@@ -140,10 +140,10 @@ public class FifoCandidatesSelector
// Can try preempting AMContainers (still saving atmost
// maxAMCapacityForThisQueue AMResource's) if more resources are
// required to be preemptionCandidates from this Queue.
- Resource maxAMCapacityForThisQueue = Resources.multiply(
- Resources.multiply(clusterResource,
- leafQueue.getAbsoluteCapacity()),
- leafQueue.getMaxAMResourcePerQueuePercent());
+ Resource maxAMCapacityForThisQueue = Resources
+ .multiply(
+ leafQueue.getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL),
+ leafQueue.getMaxAMResourcePerQueuePercent());
preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist,
resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue,
@@ -199,7 +199,6 @@ public class FifoCandidatesSelector
* Given a target preemption for a specific application, select containers
* to preempt (after unreserving all reservation for that app).
*/
- @SuppressWarnings("unchecked")
private void preemptFrom(FiCaSchedulerApp app,
Resource clusterResource, Map<String, Resource> resToObtainByPartition,
List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index fc8ad2b..8b6fa3f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolic
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
@@ -486,6 +487,13 @@ public class ProportionalCapacityPreemptionPolicy
float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt);
boolean preemptionDisabled = curQueue.getPreemptionDisabled();
+ QueueResourceQuotas queueResourceQuotas = curQueue
+ .getQueueResourceQuotas();
+ Resource effMinRes = queueResourceQuotas
+ .getEffectiveMinResource(partitionToLookAt);
+ Resource effMaxRes = queueResourceQuotas
+ .getEffectiveMaxResource(partitionToLookAt);
+
Resource current = Resources
.clone(curQueue.getQueueResourceUsage().getUsed(partitionToLookAt));
Resource killable = Resources.none();
@@ -511,7 +519,7 @@ public class ProportionalCapacityPreemptionPolicy
ret = new TempQueuePerPartition(queueName, current, preemptionDisabled,
partitionToLookAt, killable, absCap, absMaxCap, partitionResource,
- reserved, curQueue);
+ reserved, curQueue, effMinRes, effMaxRes);
if (curQueue instanceof ParentQueue) {
String configuredOrderingPolicy =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 89452f9..bd236fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -48,6 +48,9 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
double normalizedGuarantee;
+ private Resource effMinRes;
+ private Resource effMaxRes;
+
final ArrayList<TempQueuePerPartition> children;
private Collection<TempAppPerPartition> apps;
LeafQueue leafQueue;
@@ -68,7 +71,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
TempQueuePerPartition(String queueName, Resource current,
boolean preemptionDisabled, String partition, Resource killable,
float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
- Resource reserved, CSQueue queue) {
+ Resource reserved, CSQueue queue, Resource effMinRes,
+ Resource effMaxRes) {
super(queueName, current, Resource.newInstance(0, 0), reserved,
Resource.newInstance(0, 0));
@@ -95,6 +99,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
this.absCapacity = absCapacity;
this.absMaxCapacity = absMaxCapacity;
this.totalPartitionResource = totalPartitionResource;
+ this.effMinRes = effMinRes;
+ this.effMaxRes = effMaxRes;
}
public void setLeafQueue(LeafQueue l) {
@@ -177,10 +183,18 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
}
public Resource getGuaranteed() {
+ if(!effMinRes.equals(Resources.none())) {
+ return Resources.clone(effMinRes);
+ }
+
return Resources.multiply(totalPartitionResource, absCapacity);
}
public Resource getMax() {
+ if(!effMaxRes.equals(Resources.none())) {
+ return Resources.clone(effMaxRes);
+ }
+
return Resources.multiply(totalPartitionResource, absMaxCapacity);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java
new file mode 100644
index 0000000..c295323
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * This class can be used to track resource usage in queue/user/app.
+ *
+ * And it is thread-safe
+ */
+public class AbstractResourceUsage {
+ protected ReadLock readLock;
+ protected WriteLock writeLock;
+ protected Map<String, UsageByLabel> usages;
+ // short for no-label :)
+ private static final String NL = CommonNodeLabelsManager.NO_LABEL;
+
+ public AbstractResourceUsage() {
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ readLock = lock.readLock();
+ writeLock = lock.writeLock();
+
+ usages = new HashMap<String, UsageByLabel>();
+ usages.put(NL, new UsageByLabel(NL));
+ }
+
+ // Usage enum here to make implement cleaner
+ public enum ResourceType {
+ // CACHED_USED and CACHED_PENDING may be read by anyone, but must only
+ // be written by ordering policies
+ USED(0), PENDING(1), AMUSED(2), RESERVED(3), CACHED_USED(4), CACHED_PENDING(
+ 5), AMLIMIT(6), MIN_RESOURCE(7), MAX_RESOURCE(8), EFF_MIN_RESOURCE(
+ 9), EFF_MAX_RESOURCE(
+ 10), EFF_MIN_RESOURCE_UP(11), EFF_MAX_RESOURCE_UP(12);
+
+ private int idx;
+
+ private ResourceType(int value) {
+ this.idx = value;
+ }
+ }
+
+ public static class UsageByLabel {
+ // usage by label, contains all UsageType
+ private Resource[] resArr;
+
+ public UsageByLabel(String label) {
+ resArr = new Resource[ResourceType.values().length];
+ for (int i = 0; i < resArr.length; i++) {
+ resArr[i] = Resource.newInstance(0, 0);
+ };
+ }
+
+ public Resource getUsed() {
+ return resArr[ResourceType.USED.idx];
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{used=" + resArr[0] + "%, ");
+ sb.append("pending=" + resArr[1] + "%, ");
+ sb.append("am_used=" + resArr[2] + "%, ");
+ sb.append("reserved=" + resArr[3] + "%}");
+ sb.append("min_eff=" + resArr[9] + "%, ");
+ sb.append("max_eff=" + resArr[10] + "%}");
+ sb.append("min_effup=" + resArr[11] + "%, ");
+ return sb.toString();
+ }
+ }
+
+ private static Resource normalize(Resource res) {
+ if (res == null) {
+ return Resources.none();
+ }
+ return res;
+ }
+
+ protected Resource _get(String label, ResourceType type) {
+ if (label == null) {
+ label = RMNodeLabelsManager.NO_LABEL;
+ }
+
+ try {
+ readLock.lock();
+ UsageByLabel usage = usages.get(label);
+ if (null == usage) {
+ return Resources.none();
+ }
+ return normalize(usage.resArr[type.idx]);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ protected Resource _getAll(ResourceType type) {
+ try {
+ readLock.lock();
+ Resource allOfType = Resources.createResource(0);
+ for (Map.Entry<String, UsageByLabel> usageEntry : usages.entrySet()) {
+ //all usages types are initialized
+ Resources.addTo(allOfType, usageEntry.getValue().resArr[type.idx]);
+ }
+ return allOfType;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private UsageByLabel getAndAddIfMissing(String label) {
+ if (label == null) {
+ label = RMNodeLabelsManager.NO_LABEL;
+ }
+ if (!usages.containsKey(label)) {
+ UsageByLabel u = new UsageByLabel(label);
+ usages.put(label, u);
+ return u;
+ }
+
+ return usages.get(label);
+ }
+
+ protected void _set(String label, ResourceType type, Resource res) {
+ try {
+ writeLock.lock();
+ UsageByLabel usage = getAndAddIfMissing(label);
+ usage.resArr[type.idx] = res;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ protected void _inc(String label, ResourceType type, Resource res) {
+ try {
+ writeLock.lock();
+ UsageByLabel usage = getAndAddIfMissing(label);
+ Resources.addTo(usage.resArr[type.idx], res);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ protected void _dec(String label, ResourceType type, Resource res) {
+ try {
+ writeLock.lock();
+ UsageByLabel usage = getAndAddIfMissing(label);
+ Resources.subtractFrom(usage.resArr[type.idx], res);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public String toString() {
+ try {
+ readLock.lock();
+ return usages.toString();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public Set<String> getNodePartitionsSet() {
+ try {
+ readLock.lock();
+ return usages.keySet();
+ } finally {
+ readLock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueResourceQuotas.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueResourceQuotas.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueResourceQuotas.java
new file mode 100644
index 0000000..2e653fc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueResourceQuotas.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+
+/**
+ * QueueResourceQuotas by Labels for following fields by label
+ * - EFFECTIVE_MIN_CAPACITY
+ * - EFFECTIVE_MAX_CAPACITY
+ * This class can be used to track resource usage in queue/user/app.
+ *
+ * And it is thread-safe
+ */
+public class QueueResourceQuotas extends AbstractResourceUsage {
+ // short for no-label :)
+ private static final String NL = CommonNodeLabelsManager.NO_LABEL;
+
+ public QueueResourceQuotas() {
+ super();
+ }
+
+ /*
+ * Configured Minimum Resource
+ */
+ public Resource getConfiguredMinResource() {
+ return _get(NL, ResourceType.MIN_RESOURCE);
+ }
+
+ public Resource getConfiguredMinResource(String label) {
+ return _get(label, ResourceType.MIN_RESOURCE);
+ }
+
+ public void setConfiguredMinResource(String label, Resource res) {
+ _set(label, ResourceType.MIN_RESOURCE, res);
+ }
+
+ public void setConfiguredMinResource(Resource res) {
+ _set(NL, ResourceType.MIN_RESOURCE, res);
+ }
+
+ /*
+ * Configured Maximum Resource
+ */
+ public Resource getConfiguredMaxResource() {
+ return getConfiguredMaxResource(NL);
+ }
+
+ public Resource getConfiguredMaxResource(String label) {
+ return _get(label, ResourceType.MAX_RESOURCE);
+ }
+
+ public void setConfiguredMaxResource(Resource res) {
+ setConfiguredMaxResource(NL, res);
+ }
+
+ public void setConfiguredMaxResource(String label, Resource res) {
+ _set(label, ResourceType.MAX_RESOURCE, res);
+ }
+
+ /*
+ * Effective Minimum Resource
+ */
+ public Resource getEffectiveMinResource() {
+ return _get(NL, ResourceType.EFF_MIN_RESOURCE);
+ }
+
+ public Resource getEffectiveMinResource(String label) {
+ return _get(label, ResourceType.EFF_MIN_RESOURCE);
+ }
+
+ public void setEffectiveMinResource(String label, Resource res) {
+ _set(label, ResourceType.EFF_MIN_RESOURCE, res);
+ }
+
+ public void setEffectiveMinResource(Resource res) {
+ _set(NL, ResourceType.EFF_MIN_RESOURCE, res);
+ }
+
+ /*
+ * Effective Maximum Resource
+ */
+ public Resource getEffectiveMaxResource() {
+ return getEffectiveMaxResource(NL);
+ }
+
+ public Resource getEffectiveMaxResource(String label) {
+ return _get(label, ResourceType.EFF_MAX_RESOURCE);
+ }
+
+ public void setEffectiveMaxResource(Resource res) {
+ setEffectiveMaxResource(NL, res);
+ }
+
+ public void setEffectiveMaxResource(String label, Resource res) {
+ _set(label, ResourceType.EFF_MAX_RESOURCE, res);
+ }
+
+ /*
+ * Effective Minimum Resource
+ */
+ public Resource getEffectiveMinResourceUp() {
+ return _get(NL, ResourceType.EFF_MIN_RESOURCE_UP);
+ }
+
+ public Resource getEffectiveMinResourceUp(String label) {
+ return _get(label, ResourceType.EFF_MIN_RESOURCE_UP);
+ }
+
+ public void setEffectiveMinResourceUp(String label, Resource res) {
+ _set(label, ResourceType.EFF_MIN_RESOURCE_UP, res);
+ }
+
+ public void setEffectiveMinResourceUp(Resource res) {
+ _set(NL, ResourceType.EFF_MIN_RESOURCE_UP, res);
+ }
+
+ /*
+ * Effective Maximum Resource
+ */
+ public Resource getEffectiveMaxResourceUp() {
+ return getEffectiveMaxResourceUp(NL);
+ }
+
+ public Resource getEffectiveMaxResourceUp(String label) {
+ return _get(label, ResourceType.EFF_MAX_RESOURCE_UP);
+ }
+
+ public void setEffectiveMaxResourceUp(Resource res) {
+ setEffectiveMaxResourceUp(NL, res);
+ }
+
+ public void setEffectiveMaxResourceUp(String label, Resource res) {
+ _set(label, ResourceType.EFF_MAX_RESOURCE_UP, res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
index 6f0c7d2..ede4aec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
@@ -39,63 +39,12 @@ import org.apache.hadoop.yarn.util.resource.Resources;
*
* And it is thread-safe
*/
-public class ResourceUsage {
- private ReadLock readLock;
- private WriteLock writeLock;
- private Map<String, UsageByLabel> usages;
+public class ResourceUsage extends AbstractResourceUsage {
// short for no-label :)
private static final String NL = CommonNodeLabelsManager.NO_LABEL;
- private final UsageByLabel usageNoLabel;
public ResourceUsage() {
- ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- readLock = lock.readLock();
- writeLock = lock.writeLock();
-
- usages = new HashMap<String, UsageByLabel>();
- usageNoLabel = new UsageByLabel(NL);
- usages.put(NL, usageNoLabel);
- }
-
- // Usage enum here to make implement cleaner
- private enum ResourceType {
- //CACHED_USED and CACHED_PENDING may be read by anyone, but must only
- //be written by ordering policies
- USED(0), PENDING(1), AMUSED(2), RESERVED(3), CACHED_USED(4),
- CACHED_PENDING(5), AMLIMIT(6);
-
- private int idx;
-
- private ResourceType(int value) {
- this.idx = value;
- }
- }
-
- private static class UsageByLabel {
- // usage by label, contains all UsageType
- private Resource[] resArr;
-
- public UsageByLabel(String label) {
- resArr = new Resource[ResourceType.values().length];
- for (int i = 0; i < resArr.length; i++) {
- resArr[i] = Resource.newInstance(0, 0);
- };
- }
-
- public Resource getUsed() {
- return resArr[ResourceType.USED.idx];
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("{used=" + resArr[0] + "%, ");
- sb.append("pending=" + resArr[1] + "%, ");
- sb.append("am_used=" + resArr[2] + "%, ");
- sb.append("reserved=" + resArr[3] + "%}");
- sb.append("am_limit=" + resArr[6] + "%, ");
- return sb.toString();
- }
+ super();
}
/*
@@ -109,22 +58,6 @@ public class ResourceUsage {
return _get(label, ResourceType.USED);
}
- public Resource getCachedUsed() {
- return _get(NL, ResourceType.CACHED_USED);
- }
-
- public Resource getCachedUsed(String label) {
- return _get(label, ResourceType.CACHED_USED);
- }
-
- public Resource getCachedPending() {
- return _get(NL, ResourceType.CACHED_PENDING);
- }
-
- public Resource getCachedPending(String label) {
- return _get(label, ResourceType.CACHED_PENDING);
- }
-
public void incUsed(String label, Resource res) {
_inc(label, ResourceType.USED, res);
}
@@ -145,7 +78,7 @@ public class ResourceUsage {
setUsed(NL, res);
}
- public void copyAllUsed(ResourceUsage other) {
+ public void copyAllUsed(AbstractResourceUsage other) {
try {
writeLock.lock();
for (Entry<String, UsageByLabel> entry : other.usages.entrySet()) {
@@ -160,22 +93,6 @@ public class ResourceUsage {
_set(label, ResourceType.USED, res);
}
- public void setCachedUsed(String label, Resource res) {
- _set(label, ResourceType.CACHED_USED, res);
- }
-
- public void setCachedUsed(Resource res) {
- _set(NL, ResourceType.CACHED_USED, res);
- }
-
- public void setCachedPending(String label, Resource res) {
- _set(label, ResourceType.CACHED_PENDING, res);
- }
-
- public void setCachedPending(Resource res) {
- _set(NL, ResourceType.CACHED_PENDING, res);
- }
-
/*
* Pending
*/
@@ -281,6 +198,47 @@ public class ResourceUsage {
_set(label, ResourceType.AMUSED, res);
}
+ public Resource getAllPending() {
+ return _getAll(ResourceType.PENDING);
+ }
+
+ public Resource getAllUsed() {
+ return _getAll(ResourceType.USED);
+ }
+
+ // Cache Used
+ public Resource getCachedUsed() {
+ return _get(NL, ResourceType.CACHED_USED);
+ }
+
+ public Resource getCachedUsed(String label) {
+ return _get(label, ResourceType.CACHED_USED);
+ }
+
+ public Resource getCachedPending() {
+ return _get(NL, ResourceType.CACHED_PENDING);
+ }
+
+ public Resource getCachedPending(String label) {
+ return _get(label, ResourceType.CACHED_PENDING);
+ }
+
+ public void setCachedUsed(String label, Resource res) {
+ _set(label, ResourceType.CACHED_USED, res);
+ }
+
+ public void setCachedUsed(Resource res) {
+ _set(NL, ResourceType.CACHED_USED, res);
+ }
+
+ public void setCachedPending(String label, Resource res) {
+ _set(label, ResourceType.CACHED_PENDING, res);
+ }
+
+ public void setCachedPending(Resource res) {
+ _set(NL, ResourceType.CACHED_PENDING, res);
+ }
+
/*
* AM-Resource Limit
*/
@@ -316,94 +274,6 @@ public class ResourceUsage {
_set(label, ResourceType.AMLIMIT, res);
}
- private static Resource normalize(Resource res) {
- if (res == null) {
- return Resources.none();
- }
- return res;
- }
-
- private Resource _get(String label, ResourceType type) {
- if (label == null || label.equals(NL)) {
- return normalize(usageNoLabel.resArr[type.idx]);
- }
- try {
- readLock.lock();
- UsageByLabel usage = usages.get(label);
- if (null == usage) {
- return Resources.none();
- }
- return normalize(usage.resArr[type.idx]);
- } finally {
- readLock.unlock();
- }
- }
-
- private Resource _getAll(ResourceType type) {
- try {
- readLock.lock();
- Resource allOfType = Resources.createResource(0);
- for (Map.Entry<String, UsageByLabel> usageEntry : usages.entrySet()) {
- //all usages types are initialized
- Resources.addTo(allOfType, usageEntry.getValue().resArr[type.idx]);
- }
- return allOfType;
- } finally {
- readLock.unlock();
- }
- }
-
- public Resource getAllPending() {
- return _getAll(ResourceType.PENDING);
- }
-
- public Resource getAllUsed() {
- return _getAll(ResourceType.USED);
- }
-
- private UsageByLabel getAndAddIfMissing(String label) {
- if (label == null || label.equals(NL)) {
- return usageNoLabel;
- }
- if (!usages.containsKey(label)) {
- UsageByLabel u = new UsageByLabel(label);
- usages.put(label, u);
- return u;
- }
-
- return usages.get(label);
- }
-
- private void _set(String label, ResourceType type, Resource res) {
- try {
- writeLock.lock();
- UsageByLabel usage = getAndAddIfMissing(label);
- usage.resArr[type.idx] = res;
- } finally {
- writeLock.unlock();
- }
- }
-
- private void _inc(String label, ResourceType type, Resource res) {
- try {
- writeLock.lock();
- UsageByLabel usage = getAndAddIfMissing(label);
- Resources.addTo(usage.resArr[type.idx], res);
- } finally {
- writeLock.unlock();
- }
- }
-
- private void _dec(String label, ResourceType type, Resource res) {
- try {
- writeLock.lock();
- UsageByLabel usage = getAndAddIfMissing(label);
- Resources.subtractFrom(usage.resArr[type.idx], res);
- } finally {
- writeLock.unlock();
- }
- }
-
public Resource getCachedDemand(String label) {
try {
readLock.lock();
@@ -415,23 +285,4 @@ public class ResourceUsage {
readLock.unlock();
}
}
-
- @Override
- public String toString() {
- try {
- readLock.lock();
- return usages.toString();
- } finally {
- readLock.unlock();
- }
- }
-
- public Set<String> getNodePartitionsSet() {
- try {
- readLock.lock();
- return usages.keySet();
- } finally {
- readLock.unlock();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 5fbdead..39ec57a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AbsoluteResourceType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@@ -86,6 +88,7 @@ public abstract class AbstractCSQueue implements CSQueue {
final ResourceCalculator resourceCalculator;
Set<String> accessibleLabels;
+ Set<String> resourceTypes;
final RMNodeLabelsManager labelManager;
String defaultLabelExpression;
@@ -101,6 +104,14 @@ public abstract class AbstractCSQueue implements CSQueue {
// etc.
QueueCapacities queueCapacities;
+ QueueResourceQuotas queueResourceQuotas;
+
+ protected enum CapacityConfigType {
+ NONE, PERCENTAGE, ABSOLUTE_RESOURCE
+ };
+ protected CapacityConfigType capacityConfigType =
+ CapacityConfigType.NONE;
+
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
protected CapacitySchedulerContext csContext;
@@ -138,6 +149,9 @@ public abstract class AbstractCSQueue implements CSQueue {
// initialize QueueCapacities
queueCapacities = new QueueCapacities(parent == null);
+ // initialize queueResourceQuotas
+ queueResourceQuotas = new QueueResourceQuotas();
+
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
@@ -268,6 +282,10 @@ public abstract class AbstractCSQueue implements CSQueue {
this.defaultLabelExpression =
csContext.getConfiguration().getDefaultNodeLabelExpression(
getQueuePath());
+ this.resourceTypes = new HashSet<String>();
+ for (AbsoluteResourceType type : AbsoluteResourceType.values()) {
+ resourceTypes.add(type.toString().toLowerCase());
+ }
// inherit from parent if labels not set
if (this.accessibleLabels == null && parent != null) {
@@ -284,6 +302,11 @@ public abstract class AbstractCSQueue implements CSQueue {
// After we setup labels, we can setup capacities
setupConfigurableCapacities();
+ // Also fetch minimum/maximum resource constraint for this queue if
+ // configured.
+ capacityConfigType = CapacityConfigType.NONE;
+ updateConfigurableResourceRequirement(getQueuePath(), clusterResource);
+
this.maximumAllocation =
csContext.getConfiguration().getMaximumAllocationPerQueue(
getQueuePath());
@@ -356,6 +379,125 @@ public abstract class AbstractCSQueue implements CSQueue {
return unionInheritedWeights;
}
+ protected void updateConfigurableResourceRequirement(String queuePath,
+ Resource clusterResource) {
+ CapacitySchedulerConfiguration conf = csContext.getConfiguration();
+ Set<String> configuredNodelabels = conf.getConfiguredNodeLabels(queuePath);
+
+ for (String label : configuredNodelabels) {
+ Resource minResource = conf.getMinimumResourceRequirement(label,
+ queuePath, resourceTypes);
+ Resource maxResource = conf.getMaximumResourceRequirement(label,
+ queuePath, resourceTypes);
+
+ if (this.capacityConfigType.equals(CapacityConfigType.NONE)) {
+ this.capacityConfigType = (!minResource.equals(Resources.none())
+ && queueCapacities.getAbsoluteCapacity(label) == 0f)
+ ? CapacityConfigType.ABSOLUTE_RESOURCE
+ : CapacityConfigType.PERCENTAGE;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("capacityConfigType is updated as '" + capacityConfigType
+ + "' for queue '" + getQueueName());
+ }
+ }
+
+ validateAbsoluteVsPercentageCapacityConfig(minResource);
+
+ // If min resource for a resource type is greater than its max resource,
+ // throw exception to handle such error configs.
+ if (!maxResource.equals(Resources.none()) && Resources.greaterThan(
+ resourceCalculator, clusterResource, minResource, maxResource)) {
+ throw new IllegalArgumentException("Min resource configuration "
+ + minResource + " is greater than its max value:" + maxResource
+ + " in queue:" + getQueueName());
+ }
+
+ // If parent's max resource is lesser to a specific child's max
+ // resource, throw exception to handle such error configs.
+ if (parent != null) {
+ Resource parentMaxRes = parent.getQueueResourceQuotas()
+ .getConfiguredMaxResource(label);
+ if (Resources.greaterThan(resourceCalculator, clusterResource,
+ parentMaxRes, Resources.none())) {
+ if (Resources.greaterThan(resourceCalculator, clusterResource,
+ maxResource, parentMaxRes)) {
+ throw new IllegalArgumentException("Max resource configuration "
+ + maxResource + " is greater than parents max value:"
+ + parentMaxRes + " in queue:" + getQueueName());
+ }
+
+ // If child's max resource is not set, but its parent max resource is
+ // set, we must set child max resource to its parent's.
+ if (maxResource.equals(Resources.none())
+ && !minResource.equals(Resources.none())) {
+ maxResource = Resources.clone(parentMaxRes);
+ }
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Updating absolute resource configuration for queue:"
+ + getQueueName() + " as minResource=" + minResource
+ + " and maxResource=" + maxResource);
+ }
+
+ queueResourceQuotas.setConfiguredMinResource(label, minResource);
+ queueResourceQuotas.setConfiguredMaxResource(label, maxResource);
+ }
+ }
+
+ private void validateAbsoluteVsPercentageCapacityConfig(
+ Resource minResource) {
+ CapacityConfigType localType = CapacityConfigType.PERCENTAGE;
+ if (!minResource.equals(Resources.none())) {
+ localType = CapacityConfigType.ABSOLUTE_RESOURCE;
+ }
+
+ if (!queueName.equals("root")
+ && !this.capacityConfigType.equals(localType)) {
+ throw new IllegalArgumentException("Queue '" + getQueueName()
+ + "' should use either percentage based capacity"
+ + " configuration or absolute resource.");
+ }
+ }
+
+ @Override
+ public CapacityConfigType getCapacityConfigType() {
+ return capacityConfigType;
+ }
+
+ @Override
+ public Resource getEffectiveCapacity(String label) {
+ return Resources
+ .clone(getQueueResourceQuotas().getEffectiveMinResource(label));
+ }
+
+ @Override
+ public Resource getEffectiveCapacityUp(String label) {
+ return Resources
+ .clone(getQueueResourceQuotas().getEffectiveMinResourceUp(label));
+ }
+
+ @Override
+ public Resource getEffectiveCapacityDown(String label, Resource factor) {
+ return Resources.normalizeDown(resourceCalculator,
+ getQueueResourceQuotas().getEffectiveMinResource(label),
+ minimumAllocation);
+ }
+
+ @Override
+ public Resource getEffectiveMaxCapacity(String label) {
+ return Resources
+ .clone(getQueueResourceQuotas().getEffectiveMaxResource(label));
+ }
+
+ @Override
+ public Resource getEffectiveMaxCapacityDown(String label, Resource factor) {
+ return Resources.normalizeDown(resourceCalculator,
+ getQueueResourceQuotas().getEffectiveMaxResource(label),
+ minimumAllocation);
+ }
+
private void initializeQueueState(QueueState previousState,
QueueState configuredState, QueueState parentState) {
// verify that we can not any value for State other than RUNNING/STOPPED
@@ -547,6 +689,11 @@ public abstract class AbstractCSQueue implements CSQueue {
}
@Override
+ public QueueResourceQuotas getQueueResourceQuotas() {
+ return queueResourceQuotas;
+ }
+
+ @Override
public ReentrantReadWriteLock.ReadLock getReadLock() {
return readLock;
}
@@ -596,7 +743,7 @@ public abstract class AbstractCSQueue implements CSQueue {
* limit-set-by-parent)
*/
Resource queueMaxResource =
- getQueueMaxResource(nodePartition, clusterResource);
+ getQueueMaxResource(nodePartition);
return Resources.min(resourceCalculator, clusterResource,
queueMaxResource, currentResourceLimits.getLimit());
@@ -609,11 +756,8 @@ public abstract class AbstractCSQueue implements CSQueue {
return Resources.none();
}
- Resource getQueueMaxResource(String nodePartition, Resource clusterResource) {
- return Resources.multiplyAndNormalizeDown(resourceCalculator,
- labelManager.getResourceByLabel(nodePartition, clusterResource),
- queueCapacities.getAbsoluteMaximumCapacity(nodePartition),
- minimumAllocation);
+ Resource getQueueMaxResource(String nodePartition) {
+ return getEffectiveMaxCapacity(nodePartition);
}
public boolean hasChildQueues() {
@@ -774,7 +918,7 @@ public abstract class AbstractCSQueue implements CSQueue {
queueUsage.incUsed(nodeLabel, resourceToInc);
CSQueueUtils.updateUsedCapacity(resourceCalculator,
labelManager.getResourceByLabel(nodeLabel, Resources.none()),
- nodeLabel, this);
+ Resources.none(), nodeLabel, this);
if (null != parent) {
parent.incUsedResource(nodeLabel, resourceToInc, null);
}
@@ -790,7 +934,7 @@ public abstract class AbstractCSQueue implements CSQueue {
queueUsage.decUsed(nodeLabel, resourceToDec);
CSQueueUtils.updateUsedCapacity(resourceCalculator,
labelManager.getResourceByLabel(nodeLabel, Resources.none()),
- nodeLabel, this);
+ Resources.none(), nodeLabel, this);
if (null != parent) {
parent.decUsedResource(nodeLabel, resourceToDec, null);
}
@@ -896,7 +1040,7 @@ public abstract class AbstractCSQueue implements CSQueue {
Resource maxResourceLimit;
if (allocation.getSchedulingMode()
== SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
- maxResourceLimit = getQueueMaxResource(partition, cluster);
+ maxResourceLimit = getQueueMaxResource(partition);
} else{
maxResourceLimit = labelManager.getResourceByLabel(
schedulerContainer.getNodePartition(), cluster);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 3a17d1b..a93d74e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -37,21 +37,20 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue.CapacityConfigType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
/**
* <code>CSQueue</code> represents a node in the tree of
@@ -357,4 +356,41 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
* @return map of usernames and corresponding weight
*/
Map<String, Float> getUserWeights();
+
+ /**
+ * Get QueueResourceQuotas associated with each queue.
+ * @return QueueResourceQuotas
+ */
+ public QueueResourceQuotas getQueueResourceQuotas();
+
+ /**
+ * Get CapacityConfigType as PERCENTAGE or ABSOLUTE_RESOURCE
+ * @return CapacityConfigType
+ */
+ public CapacityConfigType getCapacityConfigType();
+
+ /**
+ * Get effective capacity of queue. If min/max resource is configured,
+ * preference will be given to absolute configuration over normal capacity.
+ * Also round down the result to normalizeDown.
+ *
+ * @param label
+ * partition
+ * @return effective queue capacity
+ */
+ Resource getEffectiveCapacity(String label);
+ Resource getEffectiveCapacityUp(String label);
+ Resource getEffectiveCapacityDown(String label, Resource factor);
+
+ /**
+ * Get effective max capacity of queue. If min/max resource is configured,
+ * preference will be given to absolute configuration over normal capacity.
+ * Also round down the result to normalizeDown.
+ *
+ * @param label
+ * partition
+ * @return effective max queue capacity
+ */
+ Resource getEffectiveMaxCapacity(String label);
+ Resource getEffectiveMaxCapacityDown(String label, Resource factor);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index e1014c1..81dec80 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -150,7 +150,7 @@ class CSQueueUtils {
}
}
}
-
+
// Set absolute capacities for {capacity, maximum-capacity}
private static void updateAbsoluteCapacitiesByNodeLabels(
QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) {
@@ -180,8 +180,8 @@ class CSQueueUtils {
* used resource for all partitions of this queue.
*/
public static void updateUsedCapacity(final ResourceCalculator rc,
- final Resource totalPartitionResource, String nodePartition,
- AbstractCSQueue childQueue) {
+ final Resource totalPartitionResource, Resource clusterResource,
+ String nodePartition, AbstractCSQueue childQueue) {
QueueCapacities queueCapacities = childQueue.getQueueCapacities();
CSQueueMetrics queueMetrics = childQueue.getMetrics();
ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage();
@@ -193,11 +193,8 @@ class CSQueueUtils {
if (Resources.greaterThan(rc, totalPartitionResource,
totalPartitionResource, Resources.none())) {
- // queueGuaranteed = totalPartitionedResource *
- // absolute_capacity(partition)
- Resource queueGuranteedResource =
- Resources.multiply(totalPartitionResource,
- queueCapacities.getAbsoluteCapacity(nodePartition));
+ Resource queueGuranteedResource = childQueue
+ .getEffectiveCapacity(nodePartition);
// make queueGuranteed >= minimum_allocation to avoid divided by 0.
queueGuranteedResource =
@@ -248,9 +245,7 @@ class CSQueueUtils {
for (String partition : nodeLabels) {
// Calculate guaranteed resource for a label in a queue by below logic.
// (total label resource) * (absolute capacity of label in that queue)
- Resource queueGuranteedResource = Resources.multiply(nlm
- .getResourceByLabel(partition, cluster), queue.getQueueCapacities()
- .getAbsoluteCapacity(partition));
+ Resource queueGuranteedResource = queue.getEffectiveCapacity(partition);
// Available resource in queue for a specific label will be calculated as
// {(guaranteed resource for a label in a queue) -
@@ -289,15 +284,14 @@ class CSQueueUtils {
ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage();
if (nodePartition == null) {
- for (String partition : Sets.union(
- queueCapacities.getNodePartitionsSet(),
+ for (String partition : Sets.union(queueCapacities.getNodePartitionsSet(),
queueResourceUsage.getNodePartitionsSet())) {
updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster),
- partition, childQueue);
+ cluster, partition, childQueue);
}
} else {
updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster),
- nodePartition, childQueue);
+ cluster, nodePartition, childQueue);
}
// Update queue metrics w.r.t node labels. In a generic way, we can
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 13b9ff6..8cb01ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderi
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -60,6 +61,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.Set;
import java.util.StringTokenizer;
@@ -316,6 +319,21 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final int DEFAULT_MAX_ASSIGN_PER_HEARTBEAT = -1;
+ /** Configuring absolute min/max resources in a queue **/
+ @Private
+ public static final String MINIMUM_RESOURCE = "min-resource";
+
+ @Private
+ public static final String MAXIMUM_RESOURCE = "max-resource";
+
+ public static final String DEFAULT_RESOURCE_TYPES = "memory,vcores";
+
+ public static final String PATTERN_FOR_ABSOLUTE_RESOURCE = "\\[([^\\]]+)";
+
+ public enum AbsoluteResourceType {
+ MEMORY, VCORES;
+ }
+
AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser();
public CapacitySchedulerConfiguration() {
@@ -393,7 +411,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public float getNonLabeledQueueCapacity(String queue) {
float capacity = queue.equals("root") ? 100.0f : getFloat(
- getQueuePrefix(queue) + CAPACITY, UNDEFINED);
+ getQueuePrefix(queue) + CAPACITY, 0f);
if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) {
throw new IllegalArgumentException("Illegal " +
"capacity of " + capacity + " for queue " + queue);
@@ -1496,4 +1514,163 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public int getMaxAssignPerHeartbeat() {
return getInt(MAX_ASSIGN_PER_HEARTBEAT, DEFAULT_MAX_ASSIGN_PER_HEARTBEAT);
}
+
+ public static String getUnits(String resourceValue) {
+ String units;
+ for (int i = 0; i < resourceValue.length(); i++) {
+ if (Character.isAlphabetic(resourceValue.charAt(i))) {
+ units = resourceValue.substring(i);
+ if (StringUtils.isAlpha(units)) {
+ return units;
+ }
+ }
+ }
+ return "";
+ }
+
+ /**
+ * Get absolute minimum resource requirement for a queue.
+ *
+ * @param label
+ * NodeLabel
+ * @param queue
+ * queue path
+ * @param resourceTypes
+ * Resource types
+ * @return ResourceInformation
+ */
+ public Resource getMinimumResourceRequirement(String label, String queue,
+ Set<String> resourceTypes) {
+ return internalGetLabeledResourceRequirementForQueue(queue, label,
+ resourceTypes, MINIMUM_RESOURCE);
+ }
+
+ /**
+ * Get absolute maximum resource requirement for a queue.
+ *
+ * @param label
+ * NodeLabel
+ * @param queue
+ * queue path
+ * @param resourceTypes
+ * Resource types
+ * @return Resource
+ */
+ public Resource getMaximumResourceRequirement(String label, String queue,
+ Set<String> resourceTypes) {
+ return internalGetLabeledResourceRequirementForQueue(queue, label,
+ resourceTypes, MAXIMUM_RESOURCE);
+ }
+
+ @VisibleForTesting
+ public void setMinimumResourceRequirement(String label, String queue,
+ Resource resource) {
+ updateMinMaxResourceToConf(label, queue, resource, MINIMUM_RESOURCE);
+ }
+
+ @VisibleForTesting
+ public void setMaximumResourceRequirement(String label, String queue,
+ Resource resource) {
+ updateMinMaxResourceToConf(label, queue, resource, MAXIMUM_RESOURCE);
+ }
+
+ private void updateMinMaxResourceToConf(String label, String queue,
+ Resource resource, String type) {
+ if (queue.equals("root")) {
+ throw new IllegalArgumentException(
+ "Cannot set resource, root queue will take 100% of cluster capacity");
+ }
+
+ StringBuilder resourceString = new StringBuilder();
+ resourceString
+ .append("[" + AbsoluteResourceType.MEMORY.toString().toLowerCase() + "="
+ + resource.getMemorySize() + ","
+ + AbsoluteResourceType.VCORES.toString().toLowerCase() + "="
+ + resource.getVirtualCores() + "]");
+
+ String prefix = getQueuePrefix(queue) + type;
+ if (!label.isEmpty()) {
+ prefix = getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label
+ + DOT + type;
+ }
+ set(prefix, resourceString.toString());
+ }
+
+ private Resource internalGetLabeledResourceRequirementForQueue(String queue,
+ String label, Set<String> resourceTypes, String suffix) {
+ String propertyName = getNodeLabelPrefix(queue, label) + suffix;
+ String resourceString = get(propertyName);
+ if (resourceString == null || resourceString.isEmpty()) {
+ return Resources.none();
+ }
+
+ // Define resource here.
+ Resource resource = Resource.newInstance(0l, 0);
+ Matcher matcher = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE)
+ .matcher(resourceString);
+ /*
+ * Absolute resource configuration for a queue will be grouped by "[]".
+ * Syntax of absolute resource config could be like below
+ * "memory=4Gi vcores=2". Ideally this means "4GB of memory and 2 vcores".
+ */
+ if (matcher.find()) {
+ // Get the sub-group.
+ String subGroup = matcher.group(1);
+ if (subGroup.trim().isEmpty()) {
+ return Resources.none();
+ }
+
+ for (String kvPair : subGroup.trim().split(",")) {
+ String[] splits = kvPair.split("=");
+
+ // Ensure that each sub string is key value pair separated by '='.
+ if (splits != null && splits.length > 1) {
+ updateResourceValuesFromConfig(resourceTypes, resource, splits);
+ }
+ }
+ }
+
+ // Memory has to be configured always.
+ if (resource.getMemorySize() == 0l) {
+ return Resources.none();
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CSConf - getAbsolueResourcePerQueue: prefix="
+ + getNodeLabelPrefix(queue, label) + ", capacity=" + resource);
+ }
+ return resource;
+ }
+
+ private void updateResourceValuesFromConfig(Set<String> resourceTypes,
+ Resource resource, String[] splits) {
+
+ // If key is not a valid type, skip it.
+ if (!resourceTypes.contains(splits[0])) {
+ return;
+ }
+
+ String units = getUnits(splits[1]);
+ Long resourceValue = Long
+ .valueOf(splits[1].substring(0, splits[1].length() - units.length()));
+
+ // Convert all incoming units to MB if units is configured.
+ if (!units.isEmpty()) {
+ resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue);
+ }
+
+ // map it based on key.
+ AbsoluteResourceType resType = AbsoluteResourceType
+ .valueOf(StringUtils.toUpperCase(splits[0].trim()));
+ switch (resType) {
+ case MEMORY :
+ resource.setMemorySize(resourceValue);
+ break;
+ case VCORES :
+ resource.setVirtualCores(resourceValue.intValue());
+ break;
+ default :
+ break;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 2e502b7..23d5088 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -656,12 +656,7 @@ public class LeafQueue extends AbstractCSQueue {
1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1));
effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f);
- Resource queuePartitionResource = Resources
- .multiplyAndNormalizeUp(resourceCalculator,
- labelManager.getResourceByLabel(nodePartition,
- lastClusterResource),
- queueCapacities.getAbsoluteCapacity(nodePartition),
- minimumAllocation);
+ Resource queuePartitionResource = getEffectiveCapacityUp(nodePartition);
Resource userAMLimit = Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionResource,
@@ -690,11 +685,7 @@ public class LeafQueue extends AbstractCSQueue {
* non-labeled), * with per-partition am-resource-percent to get the max am
* resource limit for this queue and partition.
*/
- Resource queuePartitionResource = Resources.multiplyAndNormalizeUp(
- resourceCalculator,
- labelManager.getResourceByLabel(nodePartition, lastClusterResource),
- queueCapacities.getAbsoluteCapacity(nodePartition),
- minimumAllocation);
+ Resource queuePartitionResource = getEffectiveCapacityUp(nodePartition);
Resource queueCurrentLimit = Resources.none();
// For non-labeled partition, we need to consider the current queue
@@ -950,6 +941,14 @@ public class LeafQueue extends AbstractCSQueue {
private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) {
// Set preemption-allowed:
// For leaf queue, only under-utilized queue is allowed to preempt resources from other queues
+ if (!queueResourceQuotas.getEffectiveMinResource(nodePartition)
+ .equals(Resources.none())) {
+ limits.setIsAllowPreemption(Resources.lessThan(resourceCalculator,
+ csContext.getClusterResource(), queueUsage.getUsed(nodePartition),
+ queueResourceQuotas.getEffectiveMinResource(nodePartition)));
+ return;
+ }
+
float usedCapacity = queueCapacities.getAbsoluteUsedCapacity(nodePartition);
float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition);
limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity);
@@ -1326,7 +1325,7 @@ public class LeafQueue extends AbstractCSQueue {
currentPartitionResourceLimit =
partition.equals(RMNodeLabelsManager.NO_LABEL)
? currentPartitionResourceLimit
- : getQueueMaxResource(partition, clusterResource);
+ : getQueueMaxResource(partition);
Resource headroom = Resources.componentwiseMin(
Resources.subtract(userLimitResource, user.getUsed(partition)),
@@ -1698,12 +1697,8 @@ public class LeafQueue extends AbstractCSQueue {
// this. So need cap limits by queue's max capacity here.
this.cachedResourceLimitsForHeadroom =
new ResourceLimits(currentResourceLimits.getLimit());
- Resource queueMaxResource =
- Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
- .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
- queueCapacities
- .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
- minimumAllocation);
+ Resource queueMaxResource = getEffectiveMaxCapacityDown(
+ RMNodeLabelsManager.NO_LABEL, minimumAllocation);
this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(
resourceCalculator, clusterResource, queueMaxResource,
currentResourceLimits.getLimit()));
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org