You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2017/08/11 17:32:30 UTC

[50/50] [abbrv] hadoop git commit: YARN-6471. Support to add min/max resource configuration for a queue. (Sunil G via wangda)

YARN-6471. Support to add min/max resource configuration for a queue. (Sunil G via wangda)

Change-Id: I9213f5297a6841fab5c573e85ee4c4e5f4a0b7ff


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/95a81934
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/95a81934
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/95a81934

Branch: refs/heads/YARN-5881
Commit: 95a81934385a1a0f404930b8075e2a066fc6c413
Parents: 4222c97
Author: Wangda Tan <wa...@apache.org>
Authored: Fri Aug 11 10:30:23 2017 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Fri Aug 11 10:30:23 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/util/StringUtils.java     |  31 ++
 .../hadoop/yarn/util/UnitsConversionUtil.java   | 217 ++++++++
 .../resource/DefaultResourceCalculator.java     |   6 +
 .../resource/DominantResourceCalculator.java    |   7 +
 .../yarn/util/resource/ResourceCalculator.java  |  12 +
 .../hadoop/yarn/util/resource/Resources.java    |   5 +
 .../capacity/FifoCandidatesSelector.java        |   9 +-
 .../ProportionalCapacityPreemptionPolicy.java   |  10 +-
 .../monitor/capacity/TempQueuePerPartition.java |  16 +-
 .../scheduler/AbstractResourceUsage.java        | 198 +++++++
 .../scheduler/QueueResourceQuotas.java          | 153 ++++++
 .../scheduler/ResourceUsage.java                | 237 ++-------
 .../scheduler/capacity/AbstractCSQueue.java     | 162 +++++-
 .../scheduler/capacity/CSQueue.java             |  42 +-
 .../scheduler/capacity/CSQueueUtils.java        |  24 +-
 .../CapacitySchedulerConfiguration.java         | 179 ++++++-
 .../scheduler/capacity/LeafQueue.java           |  31 +-
 .../scheduler/capacity/ParentQueue.java         | 203 +++++++-
 .../scheduler/capacity/UsersManager.java        |   5 +-
 .../PriorityUtilizationQueueOrderingPolicy.java |  11 +
 .../webapp/dao/CapacitySchedulerQueueInfo.java  |  15 +
 .../yarn/server/resourcemanager/MockNM.java     |   8 +
 .../yarn/server/resourcemanager/MockRM.java     |   6 +
 ...alCapacityPreemptionPolicyMockFramework.java |  13 +
 ...estProportionalCapacityPreemptionPolicy.java |  29 +-
 ...pacityPreemptionPolicyIntraQueueWithDRF.java |   6 +-
 .../TestAbsoluteResourceConfiguration.java      | 516 +++++++++++++++++++
 .../capacity/TestApplicationLimits.java         |  30 +-
 .../TestApplicationLimitsByPartition.java       |   4 +
 .../capacity/TestCapacityScheduler.java         |   2 +-
 .../scheduler/capacity/TestChildQueueOrder.java |   2 +
 .../scheduler/capacity/TestLeafQueue.java       | 261 ++++------
 .../scheduler/capacity/TestParentQueue.java     |   8 +
 .../scheduler/capacity/TestReservations.java    |  17 +
 ...tPriorityUtilizationQueueOrderingPolicy.java |   3 +
 .../webapp/TestRMWebServicesCapacitySched.java  |   4 +-
 36 files changed, 2046 insertions(+), 436 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
index cda5ec7..1be8a08 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
@@ -1152,4 +1152,35 @@ public class StringUtils {
     return s1.equalsIgnoreCase(s2);
   }
 
+  /**
+   * <p>Checks if the String contains only unicode letters.</p>
+   *
+   * <p><code>null</code> will return <code>false</code>.
+   * An empty String (length()=0) will return <code>true</code>.</p>
+   *
+   * <pre>
+   * StringUtils.isAlpha(null)   = false
+   * StringUtils.isAlpha("")     = true
+   * StringUtils.isAlpha("  ")   = false
+   * StringUtils.isAlpha("abc")  = true
+   * StringUtils.isAlpha("ab2c") = false
+   * StringUtils.isAlpha("ab-c") = false
+   * </pre>
+   *
+   * @param str  the String to check, may be null
+   * @return <code>true</code> if only contains letters, and is non-null
+   */
+  public static boolean isAlpha(String str) {
+      if (str == null) {
+          return false;
+      }
+      int sz = str.length();
+      for (int i = 0; i < sz; i++) {
+          if (Character.isLetter(str.charAt(i)) == false) {
+              return false;
+          }
+      }
+      return true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
new file mode 100644
index 0000000..79ee0f7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.math.BigInteger;
+import java.util.*;
+
+/**
+ * A util to convert values in one unit to another. Units refers to whether
+ * the value is expressed in pico, nano, etc.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class UnitsConversionUtil {
+
+  /**
+   * Helper class for encapsulating conversion values.
+   */
+  public static class Converter {
+    private long numerator;
+    private long denominator;
+
+    Converter(long n, long d) {
+      this.numerator = n;
+      this.denominator = d;
+    }
+  }
+
+  private static final String[] UNITS =
+      { "p", "n", "u", "m", "", "k", "M", "G", "T", "P", "Ki", "Mi", "Gi", "Ti",
+          "Pi" };
+  private static final List<String> SORTED_UNITS = Arrays.asList(UNITS);
+  public static final Set<String> KNOWN_UNITS = createKnownUnitsSet();
+  private static final Converter PICO =
+      new Converter(1L, 1000L * 1000L * 1000L * 1000L);
+  private static final Converter NANO =
+      new Converter(1L, 1000L * 1000L * 1000L);
+  private static final Converter MICRO = new Converter(1L, 1000L * 1000L);
+  private static final Converter MILLI = new Converter(1L, 1000L);
+  private static final Converter BASE = new Converter(1L, 1L);
+  private static final Converter KILO = new Converter(1000L, 1L);
+  private static final Converter MEGA = new Converter(1000L * 1000L, 1L);
+  private static final Converter GIGA =
+      new Converter(1000L * 1000L * 1000L, 1L);
+  private static final Converter TERA =
+      new Converter(1000L * 1000L * 1000L * 1000L, 1L);
+  private static final Converter PETA =
+      new Converter(1000L * 1000L * 1000L * 1000L * 1000L, 1L);
+
+  private static final Converter KILO_BINARY = new Converter(1024L, 1L);
+  private static final Converter MEGA_BINARY = new Converter(1024L * 1024L, 1L);
+  private static final Converter GIGA_BINARY =
+      new Converter(1024L * 1024L * 1024L, 1L);
+  private static final Converter TERA_BINARY =
+      new Converter(1024L * 1024L * 1024L * 1024L, 1L);
+  private static final Converter PETA_BINARY =
+      new Converter(1024L * 1024L * 1024L * 1024L * 1024L, 1L);
+
+  private static Set<String> createKnownUnitsSet() {
+    Set<String> ret = new HashSet<>();
+    ret.addAll(Arrays.asList(UNITS));
+    return ret;
+  }
+
+  private static Converter getConverter(String unit) {
+    switch (unit) {
+    case "p":
+      return PICO;
+    case "n":
+      return NANO;
+    case "u":
+      return MICRO;
+    case "m":
+      return MILLI;
+    case "":
+      return BASE;
+    case "k":
+      return KILO;
+    case "M":
+      return MEGA;
+    case "G":
+      return GIGA;
+    case "T":
+      return TERA;
+    case "P":
+      return PETA;
+    case "Ki":
+      return KILO_BINARY;
+    case "Mi":
+      return MEGA_BINARY;
+    case "Gi":
+      return GIGA_BINARY;
+    case "Ti":
+      return TERA_BINARY;
+    case "Pi":
+      return PETA_BINARY;
+    default:
+      throw new IllegalArgumentException(
+          "Unknown unit '" + unit + "'. Known units are " + KNOWN_UNITS);
+    }
+  }
+
+  /**
+   * Converts a value from one unit to another. Supported units can be obtained
+   * by inspecting the KNOWN_UNITS set.
+   *
+   * @param fromUnit  the unit of the from value
+   * @param toUnit    the target unit
+   * @param fromValue the value you wish to convert
+   * @return the value in toUnit
+   */
+  public static Long convert(String fromUnit, String toUnit, Long fromValue) {
+    if (toUnit == null || fromUnit == null || fromValue == null) {
+      throw new IllegalArgumentException("One or more arguments are null");
+    }
+    String overflowMsg =
+        "Converting " + fromValue + " from '" + fromUnit + "' to '" + toUnit
+            + "' will result in an overflow of Long";
+    if (fromUnit.equals(toUnit)) {
+      return fromValue;
+    }
+    Converter fc = getConverter(fromUnit);
+    Converter tc = getConverter(toUnit);
+    Long numerator = fc.numerator * tc.denominator;
+    Long denominator = fc.denominator * tc.numerator;
+    Long numeratorMultiplierLimit = Long.MAX_VALUE / numerator;
+    if (numerator < denominator) {
+      if (numeratorMultiplierLimit < fromValue) {
+        throw new IllegalArgumentException(overflowMsg);
+      }
+      return (fromValue * numerator) / denominator;
+    }
+    if (numeratorMultiplierLimit > fromValue) {
+      return (numerator * fromValue) / denominator;
+    }
+    Long tmp = numerator / denominator;
+    if ((Long.MAX_VALUE / tmp) < fromValue) {
+      throw new IllegalArgumentException(overflowMsg);
+    }
+    return fromValue * tmp;
+  }
+
+  /**
+   * Compare a value in a given unit with a value in another unit. The return
+   * value is equivalent to the value returned by compareTo.
+   *
+   * @param unitA  first unit
+   * @param valueA first value
+   * @param unitB  second unit
+   * @param valueB second value
+   * @return +1, 0 or -1 depending on whether the relationship is greater than,
+   * equal to or lesser than
+   */
+  public static int compare(String unitA, Long valueA, String unitB,
+      Long valueB) {
+    if (unitA == null || unitB == null || !KNOWN_UNITS.contains(unitA)
+        || !KNOWN_UNITS.contains(unitB)) {
+      throw new IllegalArgumentException("Units cannot be null");
+    }
+    if (!KNOWN_UNITS.contains(unitA)) {
+      throw new IllegalArgumentException("Unknown unit '" + unitA + "'");
+    }
+    if (!KNOWN_UNITS.contains(unitB)) {
+      throw new IllegalArgumentException("Unknown unit '" + unitB + "'");
+    }
+    Converter unitAC = getConverter(unitA);
+    Converter unitBC = getConverter(unitB);
+    if (unitA.equals(unitB)) {
+      return valueA.compareTo(valueB);
+    }
+    int unitAPos = SORTED_UNITS.indexOf(unitA);
+    int unitBPos = SORTED_UNITS.indexOf(unitB);
+    try {
+      Long tmpA = valueA;
+      Long tmpB = valueB;
+      if (unitAPos < unitBPos) {
+        tmpB = convert(unitB, unitA, valueB);
+      } else {
+        tmpA = convert(unitA, unitB, valueA);
+      }
+      return tmpA.compareTo(tmpB);
+    } catch (IllegalArgumentException ie) {
+      BigInteger tmpA = BigInteger.valueOf(valueA);
+      BigInteger tmpB = BigInteger.valueOf(valueB);
+      if (unitAPos < unitBPos) {
+        tmpB = tmpB.multiply(BigInteger.valueOf(unitBC.numerator));
+        tmpB = tmpB.multiply(BigInteger.valueOf(unitAC.denominator));
+        tmpB = tmpB.divide(BigInteger.valueOf(unitBC.denominator));
+        tmpB = tmpB.divide(BigInteger.valueOf(unitAC.numerator));
+      } else {
+        tmpA = tmpA.multiply(BigInteger.valueOf(unitAC.numerator));
+        tmpA = tmpA.multiply(BigInteger.valueOf(unitBC.denominator));
+        tmpA = tmpA.divide(BigInteger.valueOf(unitAC.denominator));
+        tmpA = tmpA.divide(BigInteger.valueOf(unitBC.numerator));
+      }
+      return tmpA.compareTo(tmpB);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
index bdf60bd..764deaa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
@@ -132,4 +132,10 @@ public class DefaultResourceCalculator extends ResourceCalculator {
   public boolean isAnyMajorResourceZero(Resource resource) {
     return resource.getMemorySize() == 0f;
   }
+
+  @Override
+  public Resource normalizeDown(Resource r, Resource stepFactor) {
+    return Resources.createResource(
+        roundDown((r.getMemorySize()), stepFactor.getMemorySize()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
index 7697e1d..05ddb41 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
@@ -244,4 +244,11 @@ public class DominantResourceCalculator extends ResourceCalculator {
   public boolean isAnyMajorResourceZero(Resource resource) {
     return resource.getMemorySize() == 0f || resource.getVirtualCores() == 0;
   }
+
+  @Override
+  public Resource normalizeDown(Resource r, Resource stepFactor) {
+    return Resources.createResource(
+        roundDown(r.getMemorySize(), stepFactor.getMemorySize()),
+        roundDown(r.getVirtualCores(), stepFactor.getVirtualCores()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
index 398dac5..013b723 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
@@ -236,4 +236,16 @@ public abstract class ResourceCalculator {
    * @return returns true if any resource is zero.
    */
   public abstract boolean isAnyMajorResourceZero(Resource resource);
+
+  /**
+   * Get resource <code>r</code>and normalize down using step-factor
+   * <code>stepFactor</code>.
+   *
+   * @param r
+   *          resource to be multiplied
+   * @param stepFactor
+   *          factor by which to normalize down
+   * @return resulting normalized resource
+   */
+  public abstract Resource normalizeDown(Resource r, Resource stepFactor);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index a1d14fd..3972ec2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -355,4 +355,9 @@ public class Resources {
       Resource resource) {
     return rc.isAnyMajorResourceZero(resource);
   }
+
+  public static Resource normalizeDown(ResourceCalculator calculator,
+      Resource resource, Resource factor) {
+    return calculator.normalizeDown(resource, factor);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
index f843db4..748548a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
@@ -140,10 +140,10 @@ public class FifoCandidatesSelector
         // Can try preempting AMContainers (still saving atmost
         // maxAMCapacityForThisQueue AMResource's) if more resources are
         // required to be preemptionCandidates from this Queue.
-        Resource maxAMCapacityForThisQueue = Resources.multiply(
-            Resources.multiply(clusterResource,
-                leafQueue.getAbsoluteCapacity()),
-            leafQueue.getMaxAMResourcePerQueuePercent());
+        Resource maxAMCapacityForThisQueue = Resources
+            .multiply(
+                leafQueue.getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL),
+                leafQueue.getMaxAMResourcePerQueuePercent());
 
         preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist,
             resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue,
@@ -199,7 +199,6 @@ public class FifoCandidatesSelector
    * Given a target preemption for a specific application, select containers
    * to preempt (after unreserving all reservation for that app).
    */
-  @SuppressWarnings("unchecked")
   private void preemptFrom(FiCaSchedulerApp app,
       Resource clusterResource, Map<String, Resource> resToObtainByPartition,
       List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index fc8ad2b..8b6fa3f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolic
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
@@ -486,6 +487,13 @@ public class ProportionalCapacityPreemptionPolicy
       float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt);
       boolean preemptionDisabled = curQueue.getPreemptionDisabled();
 
+      QueueResourceQuotas queueResourceQuotas = curQueue
+          .getQueueResourceQuotas();
+      Resource effMinRes = queueResourceQuotas
+          .getEffectiveMinResource(partitionToLookAt);
+      Resource effMaxRes = queueResourceQuotas
+          .getEffectiveMaxResource(partitionToLookAt);
+
       Resource current = Resources
           .clone(curQueue.getQueueResourceUsage().getUsed(partitionToLookAt));
       Resource killable = Resources.none();
@@ -511,7 +519,7 @@ public class ProportionalCapacityPreemptionPolicy
 
       ret = new TempQueuePerPartition(queueName, current, preemptionDisabled,
           partitionToLookAt, killable, absCap, absMaxCap, partitionResource,
-          reserved, curQueue);
+          reserved, curQueue, effMinRes, effMaxRes);
 
       if (curQueue instanceof ParentQueue) {
         String configuredOrderingPolicy =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 89452f9..bd236fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -48,6 +48,9 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
 
   double normalizedGuarantee;
 
+  private Resource effMinRes;
+  private Resource effMaxRes;
+
   final ArrayList<TempQueuePerPartition> children;
   private Collection<TempAppPerPartition> apps;
   LeafQueue leafQueue;
@@ -68,7 +71,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
   TempQueuePerPartition(String queueName, Resource current,
       boolean preemptionDisabled, String partition, Resource killable,
       float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
-      Resource reserved, CSQueue queue) {
+      Resource reserved, CSQueue queue, Resource effMinRes,
+      Resource effMaxRes) {
     super(queueName, current, Resource.newInstance(0, 0), reserved,
         Resource.newInstance(0, 0));
 
@@ -95,6 +99,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
     this.absCapacity = absCapacity;
     this.absMaxCapacity = absMaxCapacity;
     this.totalPartitionResource = totalPartitionResource;
+    this.effMinRes = effMinRes;
+    this.effMaxRes = effMaxRes;
   }
 
   public void setLeafQueue(LeafQueue l) {
@@ -177,10 +183,18 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
   }
 
   public Resource getGuaranteed() {
+    if(!effMinRes.equals(Resources.none())) {
+      return Resources.clone(effMinRes);
+    }
+
     return Resources.multiply(totalPartitionResource, absCapacity);
   }
 
   public Resource getMax() {
+    if(!effMaxRes.equals(Resources.none())) {
+      return Resources.clone(effMaxRes);
+    }
+
     return Resources.multiply(totalPartitionResource, absMaxCapacity);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java
new file mode 100644
index 0000000..c295323
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * This class can be used to track resource usage in queue/user/app.
+ *
+ * And it is thread-safe
+ */
+public class AbstractResourceUsage {
+  protected ReadLock readLock;
+  protected WriteLock writeLock;
+  protected Map<String, UsageByLabel> usages;
+  // short for no-label :)
+  private static final String NL = CommonNodeLabelsManager.NO_LABEL;
+
+  public AbstractResourceUsage() {
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
+
+    usages = new HashMap<String, UsageByLabel>();
+    usages.put(NL, new UsageByLabel(NL));
+  }
+
+  // Usage enum here to make implement cleaner
+  public enum ResourceType {
+    // CACHED_USED and CACHED_PENDING may be read by anyone, but must only
+    // be written by ordering policies
+    USED(0), PENDING(1), AMUSED(2), RESERVED(3), CACHED_USED(4), CACHED_PENDING(
+        5), AMLIMIT(6), MIN_RESOURCE(7), MAX_RESOURCE(8), EFF_MIN_RESOURCE(
+            9), EFF_MAX_RESOURCE(
+                10), EFF_MIN_RESOURCE_UP(11), EFF_MAX_RESOURCE_UP(12);
+
+    private int idx;
+
+    private ResourceType(int value) {
+      this.idx = value;
+    }
+  }
+
+  public static class UsageByLabel {
+    // usage by label, contains all UsageType
+    private Resource[] resArr;
+
+    public UsageByLabel(String label) {
+      resArr = new Resource[ResourceType.values().length];
+      for (int i = 0; i < resArr.length; i++) {
+        resArr[i] = Resource.newInstance(0, 0);
+      };
+    }
+
+    public Resource getUsed() {
+      return resArr[ResourceType.USED.idx];
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("{used=" + resArr[0] + "%, ");
+      sb.append("pending=" + resArr[1] + "%, ");
+      sb.append("am_used=" + resArr[2] + "%, ");
+      sb.append("reserved=" + resArr[3] + "%}");
+      sb.append("min_eff=" + resArr[9] + "%, ");
+      sb.append("max_eff=" + resArr[10] + "%}");
+      sb.append("min_effup=" + resArr[11] + "%, ");
+      return sb.toString();
+    }
+  }
+
+  private static Resource normalize(Resource res) {
+    if (res == null) {
+      return Resources.none();
+    }
+    return res;
+  }
+
+  protected Resource _get(String label, ResourceType type) {
+    if (label == null) {
+      label = RMNodeLabelsManager.NO_LABEL;
+    }
+
+    try {
+      readLock.lock();
+      UsageByLabel usage = usages.get(label);
+      if (null == usage) {
+        return Resources.none();
+      }
+      return normalize(usage.resArr[type.idx]);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  protected Resource _getAll(ResourceType type) {
+    try {
+      readLock.lock();
+      Resource allOfType = Resources.createResource(0);
+      for (Map.Entry<String, UsageByLabel> usageEntry : usages.entrySet()) {
+        //all usages types are initialized
+        Resources.addTo(allOfType, usageEntry.getValue().resArr[type.idx]);
+      }
+      return allOfType;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private UsageByLabel getAndAddIfMissing(String label) {
+    if (label == null) {
+      label = RMNodeLabelsManager.NO_LABEL;
+    }
+    if (!usages.containsKey(label)) {
+      UsageByLabel u = new UsageByLabel(label);
+      usages.put(label, u);
+      return u;
+    }
+
+    return usages.get(label);
+  }
+
+  protected void _set(String label, ResourceType type, Resource res) {
+    try {
+      writeLock.lock();
+      UsageByLabel usage = getAndAddIfMissing(label);
+      usage.resArr[type.idx] = res;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  protected void _inc(String label, ResourceType type, Resource res) {
+    try {
+      writeLock.lock();
+      UsageByLabel usage = getAndAddIfMissing(label);
+      Resources.addTo(usage.resArr[type.idx], res);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  protected void _dec(String label, ResourceType type, Resource res) {
+    try {
+      writeLock.lock();
+      UsageByLabel usage = getAndAddIfMissing(label);
+      Resources.subtractFrom(usage.resArr[type.idx], res);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public String toString() {
+    try {
+      readLock.lock();
+      return usages.toString();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public Set<String> getNodePartitionsSet() {
+    try {
+      readLock.lock();
+      return usages.keySet();
+    } finally {
+      readLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueResourceQuotas.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueResourceQuotas.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueResourceQuotas.java
new file mode 100644
index 0000000..2e653fc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueResourceQuotas.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+
+/**
+ * QueueResourceQuotas by Labels for following fields by label
+ * - EFFECTIVE_MIN_CAPACITY
+ * - EFFECTIVE_MAX_CAPACITY
+ * This class can be used to track resource usage in queue/user/app.
+ *
+ * And it is thread-safe
+ */
+public class QueueResourceQuotas extends AbstractResourceUsage {
+  // short for no-label :)
+  private static final String NL = CommonNodeLabelsManager.NO_LABEL;
+
+  public QueueResourceQuotas() {
+    super();
+  }
+
+  /*
+   * Configured Minimum Resource
+   */
+  public Resource getConfiguredMinResource() {
+    return _get(NL, ResourceType.MIN_RESOURCE);
+  }
+
+  public Resource getConfiguredMinResource(String label) {
+    return _get(label, ResourceType.MIN_RESOURCE);
+  }
+
+  public void setConfiguredMinResource(String label, Resource res) {
+    _set(label, ResourceType.MIN_RESOURCE, res);
+  }
+
+  public void setConfiguredMinResource(Resource res) {
+    _set(NL, ResourceType.MIN_RESOURCE, res);
+  }
+
+  /*
+   * Configured Maximum Resource
+   */
+  public Resource getConfiguredMaxResource() {
+    return getConfiguredMaxResource(NL);
+  }
+
+  public Resource getConfiguredMaxResource(String label) {
+    return _get(label, ResourceType.MAX_RESOURCE);
+  }
+
+  public void setConfiguredMaxResource(Resource res) {
+    setConfiguredMaxResource(NL, res);
+  }
+
+  public void setConfiguredMaxResource(String label, Resource res) {
+    _set(label, ResourceType.MAX_RESOURCE, res);
+  }
+
+  /*
+   * Effective Minimum Resource
+   */
+  public Resource getEffectiveMinResource() {
+    return _get(NL, ResourceType.EFF_MIN_RESOURCE);
+  }
+
+  public Resource getEffectiveMinResource(String label) {
+    return _get(label, ResourceType.EFF_MIN_RESOURCE);
+  }
+
+  public void setEffectiveMinResource(String label, Resource res) {
+    _set(label, ResourceType.EFF_MIN_RESOURCE, res);
+  }
+
+  public void setEffectiveMinResource(Resource res) {
+    _set(NL, ResourceType.EFF_MIN_RESOURCE, res);
+  }
+
+  /*
+   * Effective Maximum Resource
+   */
+  public Resource getEffectiveMaxResource() {
+    return getEffectiveMaxResource(NL);
+  }
+
+  public Resource getEffectiveMaxResource(String label) {
+    return _get(label, ResourceType.EFF_MAX_RESOURCE);
+  }
+
+  public void setEffectiveMaxResource(Resource res) {
+    setEffectiveMaxResource(NL, res);
+  }
+
+  public void setEffectiveMaxResource(String label, Resource res) {
+    _set(label, ResourceType.EFF_MAX_RESOURCE, res);
+  }
+
+  /*
+   * Effective Minimum Resource
+   */
+  public Resource getEffectiveMinResourceUp() {
+    return _get(NL, ResourceType.EFF_MIN_RESOURCE_UP);
+  }
+
+  public Resource getEffectiveMinResourceUp(String label) {
+    return _get(label, ResourceType.EFF_MIN_RESOURCE_UP);
+  }
+
+  public void setEffectiveMinResourceUp(String label, Resource res) {
+    _set(label, ResourceType.EFF_MIN_RESOURCE_UP, res);
+  }
+
+  public void setEffectiveMinResourceUp(Resource res) {
+    _set(NL, ResourceType.EFF_MIN_RESOURCE_UP, res);
+  }
+
+  /*
+   * Effective Maximum Resource
+   */
+  public Resource getEffectiveMaxResourceUp() {
+    return getEffectiveMaxResourceUp(NL);
+  }
+
+  public Resource getEffectiveMaxResourceUp(String label) {
+    return _get(label, ResourceType.EFF_MAX_RESOURCE_UP);
+  }
+
+  public void setEffectiveMaxResourceUp(Resource res) {
+    setEffectiveMaxResourceUp(NL, res);
+  }
+
+  public void setEffectiveMaxResourceUp(String label, Resource res) {
+    _set(label, ResourceType.EFF_MAX_RESOURCE_UP, res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
index 6f0c7d2..ede4aec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
@@ -39,63 +39,12 @@ import org.apache.hadoop.yarn.util.resource.Resources;
  * 
  * And it is thread-safe
  */
-public class ResourceUsage {
-  private ReadLock readLock;
-  private WriteLock writeLock;
-  private Map<String, UsageByLabel> usages;
+public class ResourceUsage extends AbstractResourceUsage {
   // short for no-label :)
   private static final String NL = CommonNodeLabelsManager.NO_LABEL;
-  private final UsageByLabel usageNoLabel;
 
   public ResourceUsage() {
-    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-    readLock = lock.readLock();
-    writeLock = lock.writeLock();
-
-    usages = new HashMap<String, UsageByLabel>();
-    usageNoLabel = new UsageByLabel(NL);
-    usages.put(NL, usageNoLabel);
-  }
-
-  // Usage enum here to make implement cleaner
-  private enum ResourceType {
-    //CACHED_USED and CACHED_PENDING may be read by anyone, but must only
-    //be written by ordering policies
-    USED(0), PENDING(1), AMUSED(2), RESERVED(3), CACHED_USED(4),
-      CACHED_PENDING(5), AMLIMIT(6);
-
-    private int idx;
-
-    private ResourceType(int value) {
-      this.idx = value;
-    }
-  }
-
-  private static class UsageByLabel {
-    // usage by label, contains all UsageType
-    private Resource[] resArr;
-
-    public UsageByLabel(String label) {
-      resArr = new Resource[ResourceType.values().length];
-      for (int i = 0; i < resArr.length; i++) {
-        resArr[i] = Resource.newInstance(0, 0);
-      };
-    }
-    
-    public Resource getUsed() {
-      return resArr[ResourceType.USED.idx];
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("{used=" + resArr[0] + "%, ");
-      sb.append("pending=" + resArr[1] + "%, ");
-      sb.append("am_used=" + resArr[2] + "%, ");
-      sb.append("reserved=" + resArr[3] + "%}");
-      sb.append("am_limit=" + resArr[6] + "%, ");
-      return sb.toString();
-    }
+    super();
   }
 
   /*
@@ -109,22 +58,6 @@ public class ResourceUsage {
     return _get(label, ResourceType.USED);
   }
 
-  public Resource getCachedUsed() {
-    return _get(NL, ResourceType.CACHED_USED);
-  }
-
-  public Resource getCachedUsed(String label) {
-    return _get(label, ResourceType.CACHED_USED);
-  }
-
-  public Resource getCachedPending() {
-    return _get(NL, ResourceType.CACHED_PENDING);
-  }
-
-  public Resource getCachedPending(String label) {
-    return _get(label, ResourceType.CACHED_PENDING);
-  }
-
   public void incUsed(String label, Resource res) {
     _inc(label, ResourceType.USED, res);
   }
@@ -145,7 +78,7 @@ public class ResourceUsage {
     setUsed(NL, res);
   }
   
-  public void copyAllUsed(ResourceUsage other) {
+  public void copyAllUsed(AbstractResourceUsage other) {
     try {
       writeLock.lock();
       for (Entry<String, UsageByLabel> entry : other.usages.entrySet()) {
@@ -160,22 +93,6 @@ public class ResourceUsage {
     _set(label, ResourceType.USED, res);
   }
 
-  public void setCachedUsed(String label, Resource res) {
-    _set(label, ResourceType.CACHED_USED, res);
-  }
-
-  public void setCachedUsed(Resource res) {
-    _set(NL, ResourceType.CACHED_USED, res);
-  }
-
-  public void setCachedPending(String label, Resource res) {
-    _set(label, ResourceType.CACHED_PENDING, res);
-  }
-
-  public void setCachedPending(Resource res) {
-    _set(NL, ResourceType.CACHED_PENDING, res);
-  }
-
   /*
    * Pending
    */
@@ -281,6 +198,47 @@ public class ResourceUsage {
     _set(label, ResourceType.AMUSED, res);
   }
 
+  public Resource getAllPending() {
+    return _getAll(ResourceType.PENDING);
+  }
+
+  public Resource getAllUsed() {
+    return _getAll(ResourceType.USED);
+  }
+
+  // Cache Used
+  public Resource getCachedUsed() {
+    return _get(NL, ResourceType.CACHED_USED);
+  }
+
+  public Resource getCachedUsed(String label) {
+    return _get(label, ResourceType.CACHED_USED);
+  }
+
+  public Resource getCachedPending() {
+    return _get(NL, ResourceType.CACHED_PENDING);
+  }
+
+  public Resource getCachedPending(String label) {
+    return _get(label, ResourceType.CACHED_PENDING);
+  }
+
+  public void setCachedUsed(String label, Resource res) {
+    _set(label, ResourceType.CACHED_USED, res);
+  }
+
+  public void setCachedUsed(Resource res) {
+    _set(NL, ResourceType.CACHED_USED, res);
+  }
+
+  public void setCachedPending(String label, Resource res) {
+    _set(label, ResourceType.CACHED_PENDING, res);
+  }
+
+  public void setCachedPending(Resource res) {
+    _set(NL, ResourceType.CACHED_PENDING, res);
+  }
+
   /*
    * AM-Resource Limit
    */
@@ -316,94 +274,6 @@ public class ResourceUsage {
     _set(label, ResourceType.AMLIMIT, res);
   }
 
-  private static Resource normalize(Resource res) {
-    if (res == null) {
-      return Resources.none();
-    }
-    return res;
-  }
-
-  private Resource _get(String label, ResourceType type) {
-    if (label == null || label.equals(NL)) {
-      return normalize(usageNoLabel.resArr[type.idx]);
-    }
-    try {
-      readLock.lock();
-      UsageByLabel usage = usages.get(label);
-      if (null == usage) {
-        return Resources.none();
-      }
-      return normalize(usage.resArr[type.idx]);
-    } finally {
-      readLock.unlock();
-    }
-  }
-  
-  private Resource _getAll(ResourceType type) {
-    try {
-      readLock.lock();
-      Resource allOfType = Resources.createResource(0);
-      for (Map.Entry<String, UsageByLabel> usageEntry : usages.entrySet()) {
-        //all usages types are initialized
-        Resources.addTo(allOfType, usageEntry.getValue().resArr[type.idx]);
-      }
-      return allOfType;
-    } finally {
-      readLock.unlock();
-    }
-  }
-  
-  public Resource getAllPending() {
-    return _getAll(ResourceType.PENDING);
-  }
-  
-  public Resource getAllUsed() {
-    return _getAll(ResourceType.USED);
-  }
-
-  private UsageByLabel getAndAddIfMissing(String label) {
-    if (label == null || label.equals(NL)) {
-      return usageNoLabel;
-    }
-    if (!usages.containsKey(label)) {
-      UsageByLabel u = new UsageByLabel(label);
-      usages.put(label, u);
-      return u;
-    }
-
-    return usages.get(label);
-  }
-
-  private void _set(String label, ResourceType type, Resource res) {
-    try {
-      writeLock.lock();
-      UsageByLabel usage = getAndAddIfMissing(label);
-      usage.resArr[type.idx] = res;
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  private void _inc(String label, ResourceType type, Resource res) {
-    try {
-      writeLock.lock();
-      UsageByLabel usage = getAndAddIfMissing(label);
-      Resources.addTo(usage.resArr[type.idx], res);
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  private void _dec(String label, ResourceType type, Resource res) {
-    try {
-      writeLock.lock();
-      UsageByLabel usage = getAndAddIfMissing(label);
-      Resources.subtractFrom(usage.resArr[type.idx], res);
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
   public Resource getCachedDemand(String label) {
     try {
       readLock.lock();
@@ -415,23 +285,4 @@ public class ResourceUsage {
       readLock.unlock();
     }
   }
-  
-  @Override
-  public String toString() {
-    try {
-      readLock.lock();
-      return usages.toString();
-    } finally {
-      readLock.unlock();
-    }
-  }
-  
-  public Set<String> getNodePartitionsSet() {
-    try {
-      readLock.lock();
-      return usages.keySet();
-    } finally {
-      readLock.unlock();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 5fbdead..39ec57a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AbsoluteResourceType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@@ -86,6 +88,7 @@ public abstract class AbstractCSQueue implements CSQueue {
 
   final ResourceCalculator resourceCalculator;
   Set<String> accessibleLabels;
+  Set<String> resourceTypes;
   final RMNodeLabelsManager labelManager;
   String defaultLabelExpression;
   
@@ -101,6 +104,14 @@ public abstract class AbstractCSQueue implements CSQueue {
   // etc.
   QueueCapacities queueCapacities;
 
+  QueueResourceQuotas queueResourceQuotas;
+
+  protected enum CapacityConfigType {
+    NONE, PERCENTAGE, ABSOLUTE_RESOURCE
+  };
+  protected CapacityConfigType capacityConfigType =
+      CapacityConfigType.NONE;
+
   private final RecordFactory recordFactory = 
       RecordFactoryProvider.getRecordFactory(null);
   protected CapacitySchedulerContext csContext;
@@ -138,6 +149,9 @@ public abstract class AbstractCSQueue implements CSQueue {
     // initialize QueueCapacities
     queueCapacities = new QueueCapacities(parent == null);
 
+    // initialize queueResourceQuotas
+    queueResourceQuotas = new QueueResourceQuotas();
+
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     readLock = lock.readLock();
     writeLock = lock.writeLock();
@@ -268,6 +282,10 @@ public abstract class AbstractCSQueue implements CSQueue {
       this.defaultLabelExpression =
           csContext.getConfiguration().getDefaultNodeLabelExpression(
               getQueuePath());
+      this.resourceTypes = new HashSet<String>();
+      for (AbsoluteResourceType type : AbsoluteResourceType.values()) {
+        resourceTypes.add(type.toString().toLowerCase());
+      }
 
       // inherit from parent if labels not set
       if (this.accessibleLabels == null && parent != null) {
@@ -284,6 +302,11 @@ public abstract class AbstractCSQueue implements CSQueue {
       // After we setup labels, we can setup capacities
       setupConfigurableCapacities();
 
+      // Also fetch minimum/maximum resource constraint for this queue if
+      // configured.
+      capacityConfigType = CapacityConfigType.NONE;
+      updateConfigurableResourceRequirement(getQueuePath(), clusterResource);
+
       this.maximumAllocation =
           csContext.getConfiguration().getMaximumAllocationPerQueue(
               getQueuePath());
@@ -356,6 +379,125 @@ public abstract class AbstractCSQueue implements CSQueue {
     return unionInheritedWeights;
   }
 
+  protected void updateConfigurableResourceRequirement(String queuePath,
+      Resource clusterResource) {
+    CapacitySchedulerConfiguration conf = csContext.getConfiguration();
+    Set<String> configuredNodelabels = conf.getConfiguredNodeLabels(queuePath);
+
+    for (String label : configuredNodelabels) {
+      Resource minResource = conf.getMinimumResourceRequirement(label,
+          queuePath, resourceTypes);
+      Resource maxResource = conf.getMaximumResourceRequirement(label,
+          queuePath, resourceTypes);
+
+      if (this.capacityConfigType.equals(CapacityConfigType.NONE)) {
+        this.capacityConfigType = (!minResource.equals(Resources.none())
+            && queueCapacities.getAbsoluteCapacity(label) == 0f)
+                ? CapacityConfigType.ABSOLUTE_RESOURCE
+                : CapacityConfigType.PERCENTAGE;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("capacityConfigType is updated as '" + capacityConfigType
+              + "' for queue '" + getQueueName());
+        }
+      }
+
+      validateAbsoluteVsPercentageCapacityConfig(minResource);
+
+      // If min resource for a resource type is greater than its max resource,
+      // throw exception to handle such error configs.
+      if (!maxResource.equals(Resources.none()) && Resources.greaterThan(
+          resourceCalculator, clusterResource, minResource, maxResource)) {
+        throw new IllegalArgumentException("Min resource configuration "
+            + minResource + " is greater than its max value:" + maxResource
+            + " in queue:" + getQueueName());
+      }
+
+      // If parent's max resource is lesser to a specific child's max
+      // resource, throw exception to handle such error configs.
+      if (parent != null) {
+        Resource parentMaxRes = parent.getQueueResourceQuotas()
+            .getConfiguredMaxResource(label);
+        if (Resources.greaterThan(resourceCalculator, clusterResource,
+            parentMaxRes, Resources.none())) {
+          if (Resources.greaterThan(resourceCalculator, clusterResource,
+              maxResource, parentMaxRes)) {
+            throw new IllegalArgumentException("Max resource configuration "
+                + maxResource + " is greater than parents max value:"
+                + parentMaxRes + " in queue:" + getQueueName());
+          }
+
+          // If child's max resource is not set, but its parent max resource is
+          // set, we must set child max resource to its parent's.
+          if (maxResource.equals(Resources.none())
+              && !minResource.equals(Resources.none())) {
+            maxResource = Resources.clone(parentMaxRes);
+          }
+        }
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Updating absolute resource configuration for queue:"
+            + getQueueName() + " as minResource=" + minResource
+            + " and maxResource=" + maxResource);
+      }
+
+      queueResourceQuotas.setConfiguredMinResource(label, minResource);
+      queueResourceQuotas.setConfiguredMaxResource(label, maxResource);
+    }
+  }
+
+  private void validateAbsoluteVsPercentageCapacityConfig(
+      Resource minResource) {
+    CapacityConfigType localType = CapacityConfigType.PERCENTAGE;
+    if (!minResource.equals(Resources.none())) {
+      localType = CapacityConfigType.ABSOLUTE_RESOURCE;
+    }
+
+    if (!queueName.equals("root")
+        && !this.capacityConfigType.equals(localType)) {
+      throw new IllegalArgumentException("Queue '" + getQueueName()
+          + "' should use either percentage based capacity"
+          + " configuration or absolute resource.");
+    }
+  }
+
+  @Override
+  public CapacityConfigType getCapacityConfigType() {
+    return capacityConfigType;
+  }
+
+  @Override
+  public Resource getEffectiveCapacity(String label) {
+    return Resources
+        .clone(getQueueResourceQuotas().getEffectiveMinResource(label));
+  }
+
+  @Override
+  public Resource getEffectiveCapacityUp(String label) {
+    return Resources
+        .clone(getQueueResourceQuotas().getEffectiveMinResourceUp(label));
+  }
+
+  @Override
+  public Resource getEffectiveCapacityDown(String label, Resource factor) {
+    return Resources.normalizeDown(resourceCalculator,
+        getQueueResourceQuotas().getEffectiveMinResource(label),
+        minimumAllocation);
+  }
+
+  @Override
+  public Resource getEffectiveMaxCapacity(String label) {
+    return Resources
+        .clone(getQueueResourceQuotas().getEffectiveMaxResource(label));
+  }
+
+  @Override
+  public Resource getEffectiveMaxCapacityDown(String label, Resource factor) {
+    return Resources.normalizeDown(resourceCalculator,
+        getQueueResourceQuotas().getEffectiveMaxResource(label),
+        minimumAllocation);
+  }
+
   private void initializeQueueState(QueueState previousState,
       QueueState configuredState, QueueState parentState) {
     // verify that we can not any value for State other than RUNNING/STOPPED
@@ -547,6 +689,11 @@ public abstract class AbstractCSQueue implements CSQueue {
   }
 
   @Override
+  public QueueResourceQuotas getQueueResourceQuotas() {
+    return queueResourceQuotas;
+  }
+
+  @Override
   public ReentrantReadWriteLock.ReadLock getReadLock() {
     return readLock;
   }
@@ -596,7 +743,7 @@ public abstract class AbstractCSQueue implements CSQueue {
        * limit-set-by-parent)
        */
       Resource queueMaxResource =
-          getQueueMaxResource(nodePartition, clusterResource);
+          getQueueMaxResource(nodePartition);
 
       return Resources.min(resourceCalculator, clusterResource,
           queueMaxResource, currentResourceLimits.getLimit());
@@ -609,11 +756,8 @@ public abstract class AbstractCSQueue implements CSQueue {
     return Resources.none();
   }
 
-  Resource getQueueMaxResource(String nodePartition, Resource clusterResource) {
-    return Resources.multiplyAndNormalizeDown(resourceCalculator,
-        labelManager.getResourceByLabel(nodePartition, clusterResource),
-        queueCapacities.getAbsoluteMaximumCapacity(nodePartition),
-        minimumAllocation);
+  Resource getQueueMaxResource(String nodePartition) {
+    return getEffectiveMaxCapacity(nodePartition);
   }
 
   public boolean hasChildQueues() {
@@ -774,7 +918,7 @@ public abstract class AbstractCSQueue implements CSQueue {
     queueUsage.incUsed(nodeLabel, resourceToInc);
     CSQueueUtils.updateUsedCapacity(resourceCalculator,
         labelManager.getResourceByLabel(nodeLabel, Resources.none()),
-        nodeLabel, this);
+        Resources.none(), nodeLabel, this);
     if (null != parent) {
       parent.incUsedResource(nodeLabel, resourceToInc, null);
     }
@@ -790,7 +934,7 @@ public abstract class AbstractCSQueue implements CSQueue {
     queueUsage.decUsed(nodeLabel, resourceToDec);
     CSQueueUtils.updateUsedCapacity(resourceCalculator,
         labelManager.getResourceByLabel(nodeLabel, Resources.none()),
-        nodeLabel, this);
+        Resources.none(), nodeLabel, this);
     if (null != parent) {
       parent.decUsedResource(nodeLabel, resourceToDec, null);
     }
@@ -896,7 +1040,7 @@ public abstract class AbstractCSQueue implements CSQueue {
         Resource maxResourceLimit;
         if (allocation.getSchedulingMode()
             == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
-          maxResourceLimit = getQueueMaxResource(partition, cluster);
+          maxResourceLimit = getQueueMaxResource(partition);
         } else{
           maxResourceLimit = labelManager.getResourceByLabel(
               schedulerContainer.getNodePartition(), cluster);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 3a17d1b..a93d74e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -37,21 +37,20 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.security.PrivilegedEntity;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue.CapacityConfigType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
 
 /**
  * <code>CSQueue</code> represents a node in the tree of 
@@ -357,4 +356,41 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
    * @return map of usernames and corresponding weight
    */
   Map<String, Float> getUserWeights();
+
+  /**
+   * Get QueueResourceQuotas associated with each queue.
+   * @return QueueResourceQuotas
+   */
+  public QueueResourceQuotas getQueueResourceQuotas();
+
+  /**
+   * Get CapacityConfigType as PERCENTAGE or ABSOLUTE_RESOURCE
+   * @return CapacityConfigType
+   */
+  public CapacityConfigType getCapacityConfigType();
+
+  /**
+   * Get effective capacity of queue. If min/max resource is configured,
+   * preference will be given to absolute configuration over normal capacity.
+   * Also round down the result to normalizeDown.
+   *
+   * @param label
+   *          partition
+   * @return effective queue capacity
+   */
+  Resource getEffectiveCapacity(String label);
+  Resource getEffectiveCapacityUp(String label);
+  Resource getEffectiveCapacityDown(String label, Resource factor);
+
+  /**
+   * Get effective max capacity of queue. If min/max resource is configured,
+   * preference will be given to absolute configuration over normal capacity.
+   * Also round down the result to normalizeDown.
+   *
+   * @param label
+   *          partition
+   * @return effective max queue capacity
+   */
+  Resource getEffectiveMaxCapacity(String label);
+  Resource getEffectiveMaxCapacityDown(String label, Resource factor);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index e1014c1..81dec80 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -150,7 +150,7 @@ class CSQueueUtils {
       }
     }
   }
-  
+
   // Set absolute capacities for {capacity, maximum-capacity}
   private static void updateAbsoluteCapacitiesByNodeLabels(
       QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) {
@@ -180,8 +180,8 @@ class CSQueueUtils {
    * used resource for all partitions of this queue.
    */
   public static void updateUsedCapacity(final ResourceCalculator rc,
-      final Resource totalPartitionResource, String nodePartition,
-      AbstractCSQueue childQueue) {
+      final Resource totalPartitionResource, Resource clusterResource,
+      String nodePartition, AbstractCSQueue childQueue) {
     QueueCapacities queueCapacities = childQueue.getQueueCapacities();
     CSQueueMetrics queueMetrics = childQueue.getMetrics();
     ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage();
@@ -193,11 +193,8 @@ class CSQueueUtils {
 
     if (Resources.greaterThan(rc, totalPartitionResource,
         totalPartitionResource, Resources.none())) {
-      // queueGuaranteed = totalPartitionedResource *
-      // absolute_capacity(partition)
-      Resource queueGuranteedResource =
-          Resources.multiply(totalPartitionResource,
-              queueCapacities.getAbsoluteCapacity(nodePartition));
+      Resource queueGuranteedResource = childQueue
+          .getEffectiveCapacity(nodePartition);
 
       // make queueGuranteed >= minimum_allocation to avoid divided by 0.
       queueGuranteedResource =
@@ -248,9 +245,7 @@ class CSQueueUtils {
     for (String partition : nodeLabels) {
       // Calculate guaranteed resource for a label in a queue by below logic.
       // (total label resource) * (absolute capacity of label in that queue)
-      Resource queueGuranteedResource = Resources.multiply(nlm
-          .getResourceByLabel(partition, cluster), queue.getQueueCapacities()
-          .getAbsoluteCapacity(partition));
+      Resource queueGuranteedResource = queue.getEffectiveCapacity(partition);
 
       // Available resource in queue for a specific label will be calculated as
       // {(guaranteed resource for a label in a queue) -
@@ -289,15 +284,14 @@ class CSQueueUtils {
     ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage();
 
     if (nodePartition == null) {
-      for (String partition : Sets.union(
-          queueCapacities.getNodePartitionsSet(),
+      for (String partition : Sets.union(queueCapacities.getNodePartitionsSet(),
           queueResourceUsage.getNodePartitionsSet())) {
         updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster),
-            partition, childQueue);
+            cluster, partition, childQueue);
       }
     } else {
       updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster),
-          nodePartition, childQueue);
+          cluster, nodePartition, childQueue);
     }
 
     // Update queue metrics w.r.t node labels. In a generic way, we can

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 13b9ff6..8cb01ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderi
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -60,6 +61,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.Set;
 import java.util.StringTokenizer;
 
@@ -316,6 +319,21 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   @Private
   public static final int DEFAULT_MAX_ASSIGN_PER_HEARTBEAT = -1;
 
+  /** Configuring absolute min/max resources in a queue **/
+  @Private
+  public static final String MINIMUM_RESOURCE = "min-resource";
+
+  @Private
+  public static final String MAXIMUM_RESOURCE = "max-resource";
+
+  public static final String DEFAULT_RESOURCE_TYPES = "memory,vcores";
+
+  public static final String PATTERN_FOR_ABSOLUTE_RESOURCE = "\\[([^\\]]+)";
+
+  public enum AbsoluteResourceType {
+    MEMORY, VCORES;
+  }
+
   AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser();
 
   public CapacitySchedulerConfiguration() {
@@ -393,7 +411,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   
   public float getNonLabeledQueueCapacity(String queue) {
     float capacity = queue.equals("root") ? 100.0f : getFloat(
-        getQueuePrefix(queue) + CAPACITY, UNDEFINED);
+        getQueuePrefix(queue) + CAPACITY, 0f);
     if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) {
       throw new IllegalArgumentException("Illegal " +
       		"capacity of " + capacity + " for queue " + queue);
@@ -1496,4 +1514,163 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   public int getMaxAssignPerHeartbeat() {
     return getInt(MAX_ASSIGN_PER_HEARTBEAT, DEFAULT_MAX_ASSIGN_PER_HEARTBEAT);
   }
+
+  public static String getUnits(String resourceValue) {
+    String units;
+    for (int i = 0; i < resourceValue.length(); i++) {
+      if (Character.isAlphabetic(resourceValue.charAt(i))) {
+        units = resourceValue.substring(i);
+        if (StringUtils.isAlpha(units)) {
+          return units;
+        }
+      }
+    }
+    return "";
+  }
+
+  /**
+   * Get absolute minimum resource requirement for a queue.
+   *
+   * @param label
+   *          NodeLabel
+   * @param queue
+   *          queue path
+   * @param resourceTypes
+   *          Resource types
+   * @return ResourceInformation
+   */
+  public Resource getMinimumResourceRequirement(String label, String queue,
+      Set<String> resourceTypes) {
+    return internalGetLabeledResourceRequirementForQueue(queue, label,
+        resourceTypes, MINIMUM_RESOURCE);
+  }
+
+  /**
+   * Get absolute maximum resource requirement for a queue.
+   *
+   * @param label
+   *          NodeLabel
+   * @param queue
+   *          queue path
+   * @param resourceTypes
+   *          Resource types
+   * @return Resource
+   */
+  public Resource getMaximumResourceRequirement(String label, String queue,
+      Set<String> resourceTypes) {
+    return internalGetLabeledResourceRequirementForQueue(queue, label,
+        resourceTypes, MAXIMUM_RESOURCE);
+  }
+
+  @VisibleForTesting
+  public void setMinimumResourceRequirement(String label, String queue,
+      Resource resource) {
+    updateMinMaxResourceToConf(label, queue, resource, MINIMUM_RESOURCE);
+  }
+
+  @VisibleForTesting
+  public void setMaximumResourceRequirement(String label, String queue,
+      Resource resource) {
+    updateMinMaxResourceToConf(label, queue, resource, MAXIMUM_RESOURCE);
+  }
+
+  private void updateMinMaxResourceToConf(String label, String queue,
+      Resource resource, String type) {
+    if (queue.equals("root")) {
+      throw new IllegalArgumentException(
+          "Cannot set resource, root queue will take 100% of cluster capacity");
+    }
+
+    StringBuilder resourceString = new StringBuilder();
+    resourceString
+        .append("[" + AbsoluteResourceType.MEMORY.toString().toLowerCase() + "="
+            + resource.getMemorySize() + ","
+            + AbsoluteResourceType.VCORES.toString().toLowerCase() + "="
+            + resource.getVirtualCores() + "]");
+
+    String prefix = getQueuePrefix(queue) + type;
+    if (!label.isEmpty()) {
+      prefix = getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label
+          + DOT + type;
+    }
+    set(prefix, resourceString.toString());
+  }
+
+  private Resource internalGetLabeledResourceRequirementForQueue(String queue,
+      String label, Set<String> resourceTypes, String suffix) {
+    String propertyName = getNodeLabelPrefix(queue, label) + suffix;
+    String resourceString = get(propertyName);
+    if (resourceString == null || resourceString.isEmpty()) {
+      return Resources.none();
+    }
+
+    // Define resource here.
+    Resource resource = Resource.newInstance(0l, 0);
+    Matcher matcher = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE)
+        .matcher(resourceString);
+    /*
+     * Absolute resource configuration for a queue will be grouped by "[]".
+     * Syntax of absolute resource config could be like below
+     * "memory=4Gi vcores=2". Ideally this means "4GB of memory and 2 vcores".
+     */
+    if (matcher.find()) {
+      // Get the sub-group.
+      String subGroup = matcher.group(1);
+      if (subGroup.trim().isEmpty()) {
+        return Resources.none();
+      }
+
+      for (String kvPair : subGroup.trim().split(",")) {
+        String[] splits = kvPair.split("=");
+
+        // Ensure that each sub string is key value pair separated by '='.
+        if (splits != null && splits.length > 1) {
+          updateResourceValuesFromConfig(resourceTypes, resource, splits);
+        }
+      }
+    }
+
+    // Memory has to be configured always.
+    if (resource.getMemorySize() == 0l) {
+      return Resources.none();
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("CSConf - getAbsolueResourcePerQueue: prefix="
+          + getNodeLabelPrefix(queue, label) + ", capacity=" + resource);
+    }
+    return resource;
+  }
+
+  private void updateResourceValuesFromConfig(Set<String> resourceTypes,
+      Resource resource, String[] splits) {
+
+    // If key is not a valid type, skip it.
+    if (!resourceTypes.contains(splits[0])) {
+      return;
+    }
+
+    String units = getUnits(splits[1]);
+    Long resourceValue = Long
+        .valueOf(splits[1].substring(0, splits[1].length() - units.length()));
+
+    // Convert all incoming units to MB if units is configured.
+    if (!units.isEmpty()) {
+      resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue);
+    }
+
+    // map it based on key.
+    AbsoluteResourceType resType = AbsoluteResourceType
+        .valueOf(StringUtils.toUpperCase(splits[0].trim()));
+    switch (resType) {
+      case MEMORY :
+        resource.setMemorySize(resourceValue);
+        break;
+      case VCORES :
+        resource.setVirtualCores(resourceValue.intValue());
+        break;
+      default :
+        break;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95a81934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 2e502b7..23d5088 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -656,12 +656,7 @@ public class LeafQueue extends AbstractCSQueue {
           1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1));
       effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f);
 
-      Resource queuePartitionResource = Resources
-          .multiplyAndNormalizeUp(resourceCalculator,
-              labelManager.getResourceByLabel(nodePartition,
-                  lastClusterResource),
-              queueCapacities.getAbsoluteCapacity(nodePartition),
-              minimumAllocation);
+      Resource queuePartitionResource = getEffectiveCapacityUp(nodePartition);
 
       Resource userAMLimit = Resources.multiplyAndNormalizeUp(
           resourceCalculator, queuePartitionResource,
@@ -690,11 +685,7 @@ public class LeafQueue extends AbstractCSQueue {
        * non-labeled), * with per-partition am-resource-percent to get the max am
        * resource limit for this queue and partition.
        */
-      Resource queuePartitionResource = Resources.multiplyAndNormalizeUp(
-          resourceCalculator,
-          labelManager.getResourceByLabel(nodePartition, lastClusterResource),
-          queueCapacities.getAbsoluteCapacity(nodePartition),
-          minimumAllocation);
+      Resource queuePartitionResource = getEffectiveCapacityUp(nodePartition);
 
       Resource queueCurrentLimit = Resources.none();
       // For non-labeled partition, we need to consider the current queue
@@ -950,6 +941,14 @@ public class LeafQueue extends AbstractCSQueue {
   private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) {
     // Set preemption-allowed:
     // For leaf queue, only under-utilized queue is allowed to preempt resources from other queues
+    if (!queueResourceQuotas.getEffectiveMinResource(nodePartition)
+        .equals(Resources.none())) {
+      limits.setIsAllowPreemption(Resources.lessThan(resourceCalculator,
+          csContext.getClusterResource(), queueUsage.getUsed(nodePartition),
+          queueResourceQuotas.getEffectiveMinResource(nodePartition)));
+      return;
+    }
+
     float usedCapacity = queueCapacities.getAbsoluteUsedCapacity(nodePartition);
     float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition);
     limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity);
@@ -1326,7 +1325,7 @@ public class LeafQueue extends AbstractCSQueue {
     currentPartitionResourceLimit =
         partition.equals(RMNodeLabelsManager.NO_LABEL)
             ? currentPartitionResourceLimit
-            : getQueueMaxResource(partition, clusterResource);
+            : getQueueMaxResource(partition);
 
     Resource headroom = Resources.componentwiseMin(
         Resources.subtract(userLimitResource, user.getUsed(partition)),
@@ -1698,12 +1697,8 @@ public class LeafQueue extends AbstractCSQueue {
     // this. So need cap limits by queue's max capacity here.
     this.cachedResourceLimitsForHeadroom =
         new ResourceLimits(currentResourceLimits.getLimit());
-    Resource queueMaxResource =
-        Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
-            .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
-            queueCapacities
-                .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
-            minimumAllocation);
+    Resource queueMaxResource = getEffectiveMaxCapacityDown(
+        RMNodeLabelsManager.NO_LABEL, minimumAllocation);
     this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(
         resourceCalculator, clusterResource, queueMaxResource,
         currentResourceLimits.getLimit()));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org