You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2020/06/01 17:58:51 UTC

[hadoop] branch branch-2.9 updated: YARN-6492. Generate queue metrics for each partition. Contributed by Manikandan R

This is an automated email from the ASF dual-hosted git repository.

jhung pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 72f89aa  YARN-6492. Generate queue metrics for each partition. Contributed by Manikandan R
72f89aa is described below

commit 72f89aa81b351a47defc6255d112592371319f9f
Author: Jonathan Hung <jh...@linkedin.com>
AuthorDate: Mon Jun 1 10:58:17 2020 -0700

    YARN-6492. Generate queue metrics for each partition. Contributed by Manikandan R
---
 .../hadoop/yarn/util/resource/Resources.java       |  13 +
 .../scheduler/AppSchedulingInfo.java               |  28 +-
 .../scheduler/ContainerUpdateContext.java          |  12 +-
 .../scheduler/PartitionQueueMetrics.java           |  89 +++
 .../resourcemanager/scheduler/QueueMetrics.java    | 461 ++++++++++---
 .../scheduler/SchedulerApplicationAttempt.java     |   6 +-
 .../scheduler/capacity/CSQueueMetrics.java         |   7 +-
 .../scheduler/capacity/LeafQueue.java              |  34 +-
 .../scheduler/common/fica/FiCaSchedulerApp.java    |   3 +-
 .../scheduler/fair/FSAppAttempt.java               |   2 +-
 .../scheduler/fifo/FifoAppAttempt.java             |   2 +-
 .../scheduler/TestAppSchedulingInfo.java           |   7 +-
 .../scheduler/TestPartitionQueueMetrics.java       | 752 +++++++++++++++++++++
 .../scheduler/TestSchedulerApplicationAttempt.java |   4 +-
 .../scheduler/capacity/TestLeafQueue.java          |   4 +-
 .../capacity/TestNodeLabelContainerAllocation.java | 405 ++++++++++-
 16 files changed, 1659 insertions(+), 170 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index 932fb82..402645f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -192,6 +192,19 @@ public class Resources {
     }
     return lhs;
   }
+  
+  /**
+   * Subtract {@code rhs} from {@code lhs} and reset any negative values to
+   * zero. This call will operate on a copy of {@code lhs}, leaving {@code lhs}
+   * unmodified.
+   *
+   * @param lhs {@link Resource} to subtract from
+   * @param rhs {@link Resource} to subtract
+   * @return the value of lhs after subtraction
+   */
+  public static Resource subtractNonNegative(Resource lhs, Resource rhs) {
+    return subtractFromNonNegative(clone(lhs), rhs);
+  }
 
   public static Resource negate(Resource resource) {
     return subtract(NONE, resource);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 1a6d48f..aebfe30 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -89,10 +90,12 @@ public class AppSchedulingInfo {
   private final ReentrantReadWriteLock.WriteLock writeLock;
 
   public final ContainerUpdateContext updateContext;
+  
+  private final RMContext rmContext;
 
   public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
       String user, Queue queue, AbstractUsersManager abstractUsersManager,
-      long epoch, ResourceUsage appResourceUsage) {
+      long epoch, ResourceUsage appResourceUsage, RMContext rmContext) {
     this.applicationAttemptId = appAttemptId;
     this.applicationId = appAttemptId.getApplicationId();
     this.queue = queue;
@@ -106,6 +109,7 @@ public class AppSchedulingInfo {
     updateContext = new ContainerUpdateContext(this);
     readLock = lock.readLock();
     writeLock = lock.writeLock();
+    this.rmContext = rmContext;
   }
 
   public ApplicationId getApplicationId() {
@@ -437,7 +441,7 @@ public class AppSchedulingInfo {
 
   public List<ResourceRequest> allocate(NodeType type,
       SchedulerNode node, SchedulerRequestKey schedulerKey,
-      Container containerAllocated) {
+      RMContainer containerAllocated) {
     try {
       writeLock.lock();
 
@@ -593,7 +597,7 @@ public class AppSchedulingInfo {
   }
 
   private void updateMetricsForAllocatedContainer(NodeType type,
-      SchedulerNode node, Container containerAllocated) {
+      SchedulerNode node, RMContainer containerAllocated) {
     QueueMetrics metrics = queue.getMetrics();
     if (pending) {
       // once an allocation is done we assume the application is
@@ -604,14 +608,16 @@ public class AppSchedulingInfo {
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("allocate: applicationId=" + applicationId + " container="
-          + containerAllocated.getId() + " host=" + containerAllocated
-          .getNodeId().toString() + " user=" + user + " resource="
-          + containerAllocated.getResource() + " type="
-          + type);
+          + containerAllocated.getContainer().getId() + " host="
+          + containerAllocated.getContainer().getNodeId().toString() + " user="
+          + user + " resource="
+          + containerAllocated.getContainer().getResource() + " type=" + type);
     }
-    if(node != null) {
+    if (node != null) {
       metrics.allocateResources(node.getPartition(), user, 1,
-          containerAllocated.getResource(), true);
+          containerAllocated.getContainer().getResource(), false);
+      metrics.decrPendingResources(containerAllocated.getNodeLabelExpression(),
+          user, 1, containerAllocated.getContainer().getResource());
     }
     metrics.incrNodeTypeAggregations(user, type);
   }
@@ -657,4 +663,8 @@ public class AppSchedulingInfo {
       this.readLock.unlock();
     }
   }
+  
+  public RMContext getRMContext() {
+    return this.rmContext;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
index 5ac2ac5..93f5891 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
@@ -162,10 +162,16 @@ public class ContainerUpdateContext {
       // Decrement the pending using a dummy RR with
       // resource = prev update req capability
       if (prevReq != null) {
+        Container container = Container.newInstance(UNDEFINED,
+            schedulerNode.getNodeID(), "host:port", prevReq.getCapability(),
+            schedulerKey.getPriority(), null);
         appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode,
-            schedulerKey, Container.newInstance(UNDEFINED,
-                schedulerNode.getNodeID(), "host:port",
-                prevReq.getCapability(), schedulerKey.getPriority(), null));
+            schedulerKey,
+            new RMContainerImpl(container, schedulerKey,
+                appSchedulingInfo.getApplicationAttemptId(),
+                schedulerNode.getNodeID(), appSchedulingInfo.getUser(),
+                appSchedulingInfo.getRMContext(),
+                schedulingPlacementSet.getPrimaryRequestedNodePartition()));
       }
     }
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java
new file mode 100644
index 0000000..75e8380
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+
+@Metrics(context = "yarn")
+public class PartitionQueueMetrics extends QueueMetrics {
+
+  private String partition;
+
+  protected PartitionQueueMetrics(MetricsSystem ms, String queueName,
+      Queue parent, boolean enableUserMetrics, Configuration conf,
+      String partition) {
+    super(ms, queueName, parent, enableUserMetrics, conf);
+    this.partition = partition;
+    if (getParentQueue() != null) {
+      String newQueueName = (getParentQueue() instanceof CSQueue)
+          ? ((CSQueue) getParentQueue()).getQueuePath()
+          : getParentQueue().getQueueName();
+      String parentMetricName =
+          partition + METRIC_NAME_DELIMITER + newQueueName;
+      setParent(getQueueMetrics().get(parentMetricName));
+    }
+  }
+
+  /**
+   * Partition * Queue * User Metrics
+   *
+   * Computes Metrics at Partition (Node Label) * Queue * User Level.
+   *
+   * Sample JMX O/P Structure:
+   *
+   * PartitionQueueMetrics (labelX)
+   *  QueueMetrics (A)
+   *    usermetrics
+   *  QueueMetrics (A1)
+   *    usermetrics
+   *    QueueMetrics (A2)
+   *      usermetrics
+   *    QueueMetrics (B)
+   *      usermetrics
+   *
+   * @return QueueMetrics
+   */
+  @Override
+  public synchronized QueueMetrics getUserMetrics(String userName) {
+    if (users == null) {
+      return null;
+    }
+
+    String partitionJMXStr =
+        (partition.equals(DEFAULT_PARTITION)) ? DEFAULT_PARTITION_JMX_STR
+            : partition;
+
+    QueueMetrics metrics = (PartitionQueueMetrics) users.get(userName);
+    if (metrics == null) {
+      metrics = new PartitionQueueMetrics(this.metricsSystem, this.queueName,
+          null, false, this.conf, this.partition);
+      users.put(userName, metrics);
+      metricsSystem.register(
+          pSourceName(partitionJMXStr).append(qSourceName(queueName))
+              .append(",user=").append(userName).toString(),
+          "Metrics for user '" + userName + "' in queue '" + queueName + "'",
+          ((PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partitionJMXStr)
+              .tag(QUEUE_INFO, queueName)).tag(USER_INFO, userName));
+    }
+    return metrics;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
index 4b70502..5506079 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
@@ -50,6 +49,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Splitter;
 
 @InterfaceAudience.Private
@@ -110,23 +110,43 @@ public class QueueMetrics implements MetricsSource {
       info("Queue", "Metrics by queue");
   protected static final MetricsInfo USER_INFO =
       info("User", "Metrics by user");
+  protected static final MetricsInfo PARTITION_INFO = 
+      info("Partition", "Metrics by partition");
   static final Splitter Q_SPLITTER =
       Splitter.on('.').omitEmptyStrings().trimResults();
 
   protected final MetricsRegistry registry;
   protected final String queueName;
-  protected final QueueMetrics parent;
+  private QueueMetrics parent;
+  private final Queue parentQueue;
   protected final MetricsSystem metricsSystem;
   protected final Map<String, QueueMetrics> users;
   protected final Configuration conf;
+  private final boolean enableUserMetrics;
+
+  protected static final MetricsInfo P_RECORD_INFO =
+      info("PartitionQueueMetrics", "Metrics for the resource scheduler");
+
+  // Use "default" to operate NO_LABEL (default) partition internally
+  public static final String DEFAULT_PARTITION = "default";
+
+  // Use "" to register NO_LABEL (default) partition into metrics system
+  public static final String DEFAULT_PARTITION_JMX_STR = "";
+
+  // Metric Name Delimiter
+  public static final String METRIC_NAME_DELIMITER = ".";
+  
+  public QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
+      boolean enableUserMetrics, Configuration conf) {
 
-  protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, 
-	       boolean enableUserMetrics, Configuration conf) {
     registry = new MetricsRegistry(RECORD_INFO);
     this.queueName = queueName;
+
     this.parent = parent != null ? parent.getMetrics() : null;
-    this.users = enableUserMetrics ? new HashMap<String, QueueMetrics>()
-                                   : null;
+    this.parentQueue = parent;
+    this.users = enableUserMetrics ? new HashMap<String, QueueMetrics>() : null;
+    this.enableUserMetrics = enableUserMetrics;
+
     metricsSystem = ms;
     this.conf = conf;
     runningTime = buildBuckets(conf);
@@ -146,12 +166,25 @@ public class QueueMetrics implements MetricsSource {
     return sb;
   }
 
-  public synchronized
-  static QueueMetrics forQueue(String queueName, Queue parent,
-                               boolean enableUserMetrics,
-			       Configuration conf) {
+  static StringBuilder pSourceName(String partition) {
+    StringBuilder sb = new StringBuilder(P_RECORD_INFO.name());
+    sb.append(",partition").append('=').append(partition);
+    return sb;
+  }
+
+  static StringBuilder qSourceName(String queueName) {
+    StringBuilder sb = new StringBuilder();
+    int i = 0;
+    for (String node : Q_SPLITTER.split(queueName)) {
+      sb.append(",q").append(i++).append('=').append(node);
+    }
+    return sb;
+  }
+
+  public synchronized static QueueMetrics forQueue(String queueName,
+      Queue parent, boolean enableUserMetrics, Configuration conf) {
     return forQueue(DefaultMetricsSystem.instance(), queueName, parent,
-                    enableUserMetrics, conf);
+        enableUserMetrics, conf);
   }
 
   /**
@@ -177,24 +210,20 @@ public class QueueMetrics implements MetricsSource {
     return QUEUE_METRICS;
   }
 
-  public synchronized 
-  static QueueMetrics forQueue(MetricsSystem ms, String queueName,
-                                      Queue parent, boolean enableUserMetrics,
-				      Configuration conf) {
-    QueueMetrics metrics = QUEUE_METRICS.get(queueName);
+  public synchronized static QueueMetrics forQueue(MetricsSystem ms,
+      String queueName, Queue parent, boolean enableUserMetrics,
+      Configuration conf) {
+    QueueMetrics metrics = getQueueMetrics().get(queueName);
     if (metrics == null) {
-      metrics =
-          new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf).
-          tag(QUEUE_INFO, queueName);
-      
+      metrics = new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf)
+          .tag(QUEUE_INFO, queueName);
+
       // Register with the MetricsSystems
       if (ms != null) {
-        metrics = 
-            ms.register(
-                sourceName(queueName).toString(), 
-                "Metrics for queue: " + queueName, metrics);
+        metrics = ms.register(sourceName(queueName).toString(),
+            "Metrics for queue: " + queueName, metrics);
       }
-      QUEUE_METRICS.put(queueName, metrics);
+      getQueueMetrics().put(queueName, metrics);
     }
 
     return metrics;
@@ -206,7 +235,8 @@ public class QueueMetrics implements MetricsSource {
     }
     QueueMetrics metrics = users.get(userName);
     if (metrics == null) {
-      metrics = new QueueMetrics(metricsSystem, queueName, null, false, conf);
+      metrics =
+          new QueueMetrics(metricsSystem, queueName, null, false, conf);
       users.put(userName, metrics);
       metricsSystem.register(
           sourceName(queueName).append(",user=").append(userName).toString(),
@@ -216,6 +246,96 @@ public class QueueMetrics implements MetricsSource {
     return metrics;
   }
 
+  /**
+   * Partition * Queue Metrics
+   *
+   * Computes Metrics at Partition (Node Label) * Queue Level.
+   *
+   * Sample JMX O/P Structure:
+   *
+   * PartitionQueueMetrics (labelX)
+   *  QueueMetrics (A)
+   *    metrics
+   *    QueueMetrics (A1)
+   *      metrics
+   *    QueueMetrics (A2)
+   *      metrics
+   *  QueueMetrics (B)
+   *    metrics
+   *
+   * @param partition
+   * @return QueueMetrics
+   */
+  public synchronized QueueMetrics getPartitionQueueMetrics(String partition) {
+
+    String partitionJMXStr = partition;
+
+    if ((partition == null)
+        || (partition.equals(RMNodeLabelsManager.NO_LABEL))) {
+      partition = DEFAULT_PARTITION;
+      partitionJMXStr = DEFAULT_PARTITION_JMX_STR;
+    }
+
+    String metricName = partition + METRIC_NAME_DELIMITER + this.queueName;
+    QueueMetrics metrics = getQueueMetrics().get(metricName);
+
+    if (metrics == null) {
+      QueueMetrics queueMetrics =
+          new PartitionQueueMetrics(metricsSystem, this.queueName, parentQueue,
+              this.enableUserMetrics, this.conf, partition);
+      metricsSystem.register(
+          pSourceName(partitionJMXStr).append(qSourceName(this.queueName))
+              .toString(),
+          "Metrics for queue: " + this.queueName,
+          queueMetrics.tag(PARTITION_INFO, partitionJMXStr).tag(QUEUE_INFO,
+              this.queueName));
+      getQueueMetrics().put(metricName, queueMetrics);
+      return queueMetrics;
+    } else {
+      return metrics;
+    }
+  }
+
+  /**
+   * Partition Metrics
+   *
+   * Computes Metrics at Partition (Node Label) Level.
+   *
+   * Sample JMX O/P Structure:
+   *
+   * PartitionQueueMetrics (labelX)
+   *  metrics
+   *
+   * @param partition
+   * @return QueueMetrics
+   */
+  private QueueMetrics getPartitionMetrics(String partition) {
+
+    String partitionJMXStr = partition;
+    if ((partition == null)
+        || (partition.equals(RMNodeLabelsManager.NO_LABEL))) {
+      partition = DEFAULT_PARTITION;
+      partitionJMXStr = DEFAULT_PARTITION_JMX_STR;
+    }
+
+    String metricName = partition + METRIC_NAME_DELIMITER;
+    QueueMetrics metrics = getQueueMetrics().get(metricName);
+    if (metrics == null) {
+      metrics = new PartitionQueueMetrics(metricsSystem, this.queueName, null,
+          false, this.conf, partition);
+
+      // Register with the MetricsSystems
+      if (metricsSystem != null) {
+        metricsSystem.register(pSourceName(partitionJMXStr).toString(),
+            "Metrics for partition: " + partitionJMXStr,
+            (PartitionQueueMetrics) metrics.tag(PARTITION_INFO,
+                partitionJMXStr));
+      }
+      getQueueMetrics().put(metricName, metrics);
+    }
+    return metrics;
+  }
+
   private ArrayList<Integer> parseInts(String value) {
     ArrayList<Integer> result = new ArrayList<Integer>();
     for(String s: value.split(",")) {
@@ -355,12 +475,31 @@ public class QueueMetrics implements MetricsSource {
    * @param limit resource limit
    */
   public void setAvailableResourcesToQueue(String partition, Resource limit) {
-    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      availableMB.set(limit.getMemorySize());
-      availableVCores.set(limit.getVirtualCores());
+    if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      setAvailableResources(limit);
+    }
+    QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+    if (partitionQueueMetrics != null) {
+      partitionQueueMetrics.setAvailableResources(limit);
+      if (this.queueName.equals("root")) {
+        QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+        if (partitionMetrics != null) {
+          partitionMetrics.setAvailableResources(limit);
+        }
+      }
     }
   }
-
+      
+  /**
+   * Set Available resources with support for resource vectors.
+   *
+   * @param limit
+   */
+  public void setAvailableResources(Resource limit) {
+    availableMB.set(limit.getMemorySize());
+    availableVCores.set(limit.getVirtualCores());
+  }
+      
   /**
    * Set available resources. To be called by scheduler periodically as
    * resources become available.
@@ -382,7 +521,15 @@ public class QueueMetrics implements MetricsSource {
     if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
       QueueMetrics userMetrics = getUserMetrics(user);
       if (userMetrics != null) {
-        userMetrics.setAvailableResourcesToQueue(partition, limit);
+        userMetrics.setAvailableResources(limit);
+      }
+    }
+    QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+    if (partitionQueueMetrics != null) {
+      QueueMetrics partitionUserMetrics =
+          partitionQueueMetrics.getUserMetrics(user);
+      if (partitionUserMetrics != null) {
+        partitionUserMetrics.setAvailableResources(limit);
       }
     }
   }
@@ -397,19 +544,34 @@ public class QueueMetrics implements MetricsSource {
    */
   public void incrPendingResources(String partition, String user,
       int containers, Resource res) {
-    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      _incrPendingResources(containers, res);
-      QueueMetrics userMetrics = getUserMetrics(user);
-      if (userMetrics != null) {
-        userMetrics.incrPendingResources(partition, user, containers, res);
-      }
-      if (parent != null) {
-        parent.incrPendingResources(partition, user, containers, res);
+    if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      internalIncrPendingResources(partition, user, containers, res);
+    }
+    QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+    if (partitionQueueMetrics != null) {
+      partitionQueueMetrics.internalIncrPendingResources(partition, user,
+          containers, res);
+      QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+      if (partitionMetrics != null) {
+        partitionMetrics.incrementPendingResources(containers, res);
       }
     }
   }
 
-  private void _incrPendingResources(int containers, Resource res) {
+  public void internalIncrPendingResources(String partition, String user,
+      int containers, Resource res) {
+    incrementPendingResources(containers, res);
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.internalIncrPendingResources(partition, user, containers,
+          res);
+    }
+    if (parent != null) {
+      parent.internalIncrPendingResources(partition, user, containers, res);
+    }
+  }
+    
+  private void incrementPendingResources(int containers, Resource res) {
     pendingContainers.incr(containers);
     pendingMB.incr(res.getMemorySize() * containers);
     pendingVCores.incr(res.getVirtualCores() * containers);
@@ -418,19 +580,34 @@ public class QueueMetrics implements MetricsSource {
 
   public void decrPendingResources(String partition, String user,
       int containers, Resource res) {
-    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      _decrPendingResources(containers, res);
-      QueueMetrics userMetrics = getUserMetrics(user);
-      if (userMetrics != null) {
-        userMetrics.decrPendingResources(partition, user, containers, res);
-      }
-      if (parent != null) {
-        parent.decrPendingResources(partition, user, containers, res);
+    if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      internalDecrPendingResources(partition, user, containers, res);
+    }
+    QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+    if (partitionQueueMetrics != null) {
+      partitionQueueMetrics.internalDecrPendingResources(partition, user,
+          containers, res);
+      QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+      if (partitionMetrics != null) {
+        partitionMetrics.decrementPendingResources(containers, res);
       }
     }
   }
+  
+  public void internalDecrPendingResources(String partition, String user,
+      int containers, Resource res) {
+    decrementPendingResources(containers, res);
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.internalDecrPendingResources(partition, user, containers,
+          res);
+    }
+    if (parent != null) {
+      parent.internalDecrPendingResources(partition, user, containers, res);
+    }
+  }
 
-  private void _decrPendingResources(int containers, Resource res) {
+  private void decrementPendingResources(int containers, Resource res) {
     pendingContainers.decr(containers);
     pendingMB.decr(res.getMemorySize() * containers);
     pendingVCores.decr(res.getVirtualCores() * containers);
@@ -455,28 +632,45 @@ public class QueueMetrics implements MetricsSource {
     }
   }
 
-  public void allocateResources(String partition, String user,
-      int containers, Resource res, boolean decrPending) {
-    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      allocatedContainers.incr(containers);
-      aggregateContainersAllocated.incr(containers);
-
-      allocatedMB.incr(res.getMemorySize() * containers);
-      allocatedVCores.incr(res.getVirtualCores() * containers);
-      if (decrPending) {
-        _decrPendingResources(containers, res);
-      }
-      QueueMetrics userMetrics = getUserMetrics(user);
-      if (userMetrics != null) {
-        userMetrics.allocateResources(partition, user,
-            containers, res, decrPending);
-      }
-      if (parent != null) {
-        parent.allocateResources(partition, user, containers, res, decrPending);
+  public void allocateResources(String partition, String user, int containers,
+      Resource res, boolean decrPending) {
+    if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      internalAllocateResources(partition, user, containers, res, decrPending);
+    }
+    QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+    if (partitionQueueMetrics != null) {
+      partitionQueueMetrics.internalAllocateResources(partition, user,
+          containers, res, decrPending);
+      QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+      if (partitionMetrics != null) {
+        partitionMetrics.computeAllocateResources(containers, res, decrPending);
       }
     }
   }
 
+  public void internalAllocateResources(String partition, String user,
+      int containers, Resource res, boolean decrPending) {
+    computeAllocateResources(containers, res, decrPending);
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.internalAllocateResources(partition, user, containers, res,
+          decrPending);
+    }
+    if (parent != null) {
+      parent.internalAllocateResources(partition, user, containers, res, decrPending);
+    }
+  }
+
+  private void computeAllocateResources(int containers, Resource res,
+      boolean decrPending) {
+    allocatedContainers.incr(containers);
+    aggregateContainersAllocated.incr(containers);
+    allocatedMB.incr(res.getMemorySize() * containers);
+    allocatedVCores.incr(res.getVirtualCores() * containers);
+    if (decrPending) {
+      decrementPendingResources(containers, res);
+    }
+  }
   /**
    * Allocate Resource for container size change.
    * @param partition Node Partition
@@ -484,40 +678,57 @@ public class QueueMetrics implements MetricsSource {
    * @param res
    */
   public void allocateResources(String partition, String user, Resource res) {
-    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      allocatedMB.incr(res.getMemorySize());
-      allocatedVCores.incr(res.getVirtualCores());
+    allocatedMB.incr(res.getMemorySize());
+    allocatedVCores.incr(res.getVirtualCores());
 
-      pendingMB.decr(res.getMemorySize());
-      pendingVCores.decr(res.getVirtualCores());
+    pendingMB.decr(res.getMemorySize());
+    pendingVCores.decr(res.getVirtualCores());
 
-      QueueMetrics userMetrics = getUserMetrics(user);
-      if (userMetrics != null) {
-        userMetrics.allocateResources(partition, user, res);
-      }
-      if (parent != null) {
-        parent.allocateResources(partition, user, res);
-      }
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.allocateResources(partition, user, res);
+    }
+    if (parent != null) {
+      parent.allocateResources(partition, user, res);
     }
   }
 
-  public void releaseResources(String partition,
-      String user, int containers, Resource res) {
-    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      allocatedContainers.decr(containers);
-      aggregateContainersReleased.incr(containers);
-      allocatedMB.decr(res.getMemorySize() * containers);
-      allocatedVCores.decr(res.getVirtualCores() * containers);
-      QueueMetrics userMetrics = getUserMetrics(user);
-      if (userMetrics != null) {
-        userMetrics.releaseResources(partition, user, containers, res);
-      }
-      if (parent != null) {
-        parent.releaseResources(partition, user, containers, res);
+  public void releaseResources(String partition, String user, int containers,
+      Resource res) {
+    if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      internalReleaseResources(partition, user, containers, res);
+    }
+    QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+    if (partitionQueueMetrics != null) {
+      partitionQueueMetrics.internalReleaseResources(partition, user,
+          containers, res);
+      QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+      if (partitionMetrics != null) {
+        partitionMetrics.computeReleaseResources(containers, res);
       }
     }
   }
 
+  public void internalReleaseResources(String partition, String user,
+      int containers, Resource res) {
+    computeReleaseResources(containers, res);
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.internalReleaseResources(partition, user, containers, res);
+    }
+    if (parent != null) {
+      parent.internalReleaseResources(partition, user, containers, res);
+    }
+  }
+
+  private void computeReleaseResources(int containers, Resource res) {
+    allocatedContainers.decr(containers);
+    aggregateContainersReleased.incr(containers);
+    allocatedMB.decr(res.getMemorySize() * containers);
+    allocatedVCores.decr(res.getVirtualCores() * containers);
+  }
+
+
   /**
    * Release Resource for container size change.
    *
@@ -566,42 +777,68 @@ public class QueueMetrics implements MetricsSource {
   }
 
   public void reserveResource(String partition, String user, Resource res) {
-    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      reserveResource(user, res);
+    if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      internalReserveResources(partition, user, res);
+    }
+    QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+    if (partitionQueueMetrics != null) {
+      partitionQueueMetrics.internalReserveResources(partition, user, res);
+      QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+      if (partitionMetrics != null) {
+        partitionMetrics.incrReserveResources(res);
+      }
     }
   }
 
-  public void reserveResource(String user, Resource res) {
-    reservedContainers.incr();
-    reservedMB.incr(res.getMemorySize());
-    reservedVCores.incr(res.getVirtualCores());
+  public void internalReserveResources(String partition, String user,
+      Resource res) {
+    incrReserveResources(res);
     QueueMetrics userMetrics = getUserMetrics(user);
     if (userMetrics != null) {
-      userMetrics.reserveResource(user, res);
+      userMetrics.internalReserveResources(partition, user, res);
     }
     if (parent != null) {
-      parent.reserveResource(user, res);
+      parent.internalReserveResources(partition, user, res);
     }
   }
 
-  public void unreserveResource(String user, Resource res) {
-    reservedContainers.decr();
-    reservedMB.decr(res.getMemorySize());
-    reservedVCores.decr(res.getVirtualCores());
+  public void incrReserveResources(Resource res) {
+    reservedContainers.incr();
+    reservedMB.incr(res.getMemorySize());
+    reservedVCores.incr(res.getVirtualCores());
+  }
+
+  public void internalUnReserveResources(String partition, String user,
+      Resource res) {
+    decrReserveResource(user, res);
     QueueMetrics userMetrics = getUserMetrics(user);
     if (userMetrics != null) {
-      userMetrics.unreserveResource(user, res);
+      userMetrics.internalUnReserveResources(partition, user, res);
     }
     if (parent != null) {
-      parent.unreserveResource(user, res);
+      parent.internalUnReserveResources(partition, user, res);
     }
   }
-
+  
   public void unreserveResource(String partition, String user, Resource res) {
     if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      unreserveResource(user, res);
+      internalUnReserveResources(partition, user, res);
+    }
+    QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+    if (partitionQueueMetrics != null) {
+      partitionQueueMetrics.internalUnReserveResources(partition, user, res);
+      QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+      if (partitionMetrics != null) {
+        partitionMetrics.decrReserveResource(user, res);
+      }
     }
   }
+  
+  public void decrReserveResource(String user, Resource res) {
+    reservedContainers.decr();
+    reservedMB.decr(res.getMemorySize());
+    reservedVCores.decr(res.getVirtualCores());
+  }
 
   public void incrActiveUsers() {
     activeUsers.incr();
@@ -755,4 +992,12 @@ public class QueueMetrics implements MetricsSource {
   public long getAggregateVcoresPreempted() {
     return aggregateVcoresPreempted.value();
   }
+
+  public void setParent(QueueMetrics parent) {
+    this.parent = parent;
+  }
+
+  public Queue getParentQueue() {
+    return parentQueue;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 67e565b..46ac2c6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -204,9 +204,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       RMContext rmContext) {
     Preconditions.checkNotNull(rmContext, "RMContext should not be null");
     this.rmContext = rmContext;
-    this.appSchedulingInfo = 
-        new AppSchedulingInfo(applicationAttemptId, user, queue,  
-            abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage);
+    this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user,
+        queue, abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage,
+        this.rmContext);
     this.queue = queue;
     this.pendingRelease = Collections.newSetFromMap(
         new ConcurrentHashMap<ContainerId, Boolean>());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java
index 8382665..965cc2b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java
@@ -168,7 +168,7 @@ public class CSQueueMetrics extends QueueMetrics {
   public synchronized static CSQueueMetrics forQueue(String queueName,
       Queue parent, boolean enableUserMetrics, Configuration conf) {
     MetricsSystem ms = DefaultMetricsSystem.instance();
-    QueueMetrics metrics = QueueMetrics.getQueueMetrics().get(queueName);
+    QueueMetrics metrics = getQueueMetrics().get(queueName);
     if (metrics == null) {
       metrics =
           new CSQueueMetrics(ms, queueName, parent, enableUserMetrics, conf)
@@ -180,7 +180,7 @@ public class CSQueueMetrics extends QueueMetrics {
             ms.register(sourceName(queueName).toString(), "Metrics for queue: "
                 + queueName, metrics);
       }
-      QueueMetrics.getQueueMetrics().put(queueName, metrics);
+      getQueueMetrics().put(queueName, metrics);
     }
 
     return (CSQueueMetrics) metrics;
@@ -193,7 +193,8 @@ public class CSQueueMetrics extends QueueMetrics {
     }
     CSQueueMetrics metrics = (CSQueueMetrics) users.get(userName);
     if (metrics == null) {
-      metrics = new CSQueueMetrics(metricsSystem, queueName, null, false, conf);
+      metrics =
+        new CSQueueMetrics(metricsSystem, queueName, null, false, conf);
       users.put(userName, metrics);
       metricsSystem.register(
           sourceName(queueName).append(",user=").append(userName).toString(),
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index c352ba4..076a9f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -1383,8 +1383,9 @@ public class LeafQueue extends AbstractCSQueue {
             : getQueueMaxResource(partition, clusterResource);
 
     Resource headroom = Resources.componentwiseMin(
-        Resources.subtract(userLimitResource, user.getUsed(partition)),
-        Resources.subtract(currentPartitionResourceLimit,
+        Resources.subtractNonNegative(userLimitResource,
+            user.getUsed(partition)),
+        Resources.subtractNonNegative(currentPartitionResourceLimit,
             queueUsage.getUsed(partition)));
     // Normalize it before return
     headroom =
@@ -1698,12 +1699,17 @@ public class LeafQueue extends AbstractCSQueue {
       User user = usersManager.updateUserResourceUsage(userName, resource,
           nodePartition, true);
 
-      // Note this is a bit unconventional since it gets the object and modifies
-      // it here, rather then using set routine
-      Resources.subtractFrom(application.getHeadroom(), resource); // headroom
-      metrics.setAvailableResourcesToUser(nodePartition,
-          userName, application.getHeadroom());
-
+      Resource partitionHeadroom = Resources.createResource(0, 0);
+      if (metrics.getUserMetrics(userName) != null) {
+        partitionHeadroom = getHeadroom(user,
+            cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
+            getResourceLimitForActiveUsers(userName, clusterResource,
+                nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
+            nodePartition);
+      }
+      metrics.setAvailableResourcesToUser(nodePartition, userName,
+          partitionHeadroom);
+      
       if (LOG.isDebugEnabled()) {
         LOG.debug(getQueueName() + " user=" + userName + " used="
             + queueUsage.getUsed(nodePartition) + " numContainers="
@@ -1741,8 +1747,16 @@ public class LeafQueue extends AbstractCSQueue {
       User user = usersManager.updateUserResourceUsage(userName, resource,
           nodePartition, false);
 
-      metrics.setAvailableResourcesToUser(nodePartition,
-          userName, application.getHeadroom());
+      Resource partitionHeadroom = Resources.createResource(0, 0);
+      if (metrics.getUserMetrics(userName) != null) {
+        partitionHeadroom = getHeadroom(user,
+            cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
+            getResourceLimitForActiveUsers(userName, clusterResource,
+                nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
+            nodePartition);
+      }
+      metrics.setAvailableResourcesToUser(nodePartition, userName,
+          partitionHeadroom);
 
       if (LOG.isDebugEnabled()) {
         LOG.debug(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 9562eea..5695cf1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -540,8 +540,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
           List<ResourceRequest> requests = appSchedulingInfo.allocate(
               allocation.getAllocationLocalityType(),
               schedulerContainer.getSchedulerNode(),
-              schedulerContainer.getSchedulerRequestKey(),
-              schedulerContainer.getRmContainer().getContainer());
+              schedulerContainer.getSchedulerRequestKey(), rmContainer);
           ((RMContainerImpl) rmContainer).setResourceRequests(requests);
 
           attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(),
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 5897914..6e63c87 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -469,7 +469,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
       // Update consumption and track allocations
       List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
-          type, node, schedulerKey, container);
+          type, node, schedulerKey, rmContainer);
       this.attemptResourceUsage.incUsed(container.getResource());
       getQueue().incUsedResource(container.getResource());
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
index 230f66f..e320f2d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
@@ -81,7 +81,7 @@ public class FifoAppAttempt extends FiCaSchedulerApp {
 
       // Update consumption and track allocations
       List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
-          type, node, schedulerKey, container);
+          type, node, schedulerKey, rmContainer);
 
       attemptResourceUsage.incUsed(node.getPartition(),
           container.getResource());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
index bb29889..3afa0ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@@ -47,8 +48,9 @@ public class TestAppSchedulingInfo {
 
     FSLeafQueue queue = mock(FSLeafQueue.class);
     doReturn("test").when(queue).getQueueName();
+    RMContext rmContext = mock(RMContext.class);
     AppSchedulingInfo  appSchedulingInfo = new AppSchedulingInfo(
-        appAttemptId, "test", queue, null, 0, new ResourceUsage());
+        appAttemptId, "test", queue, null, 0, new ResourceUsage(), rmContext);
 
     appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList<String>(),
         new ArrayList<String>());
@@ -118,9 +120,10 @@ public class TestAppSchedulingInfo {
 
     Queue queue = mock(Queue.class);
     doReturn(mock(QueueMetrics.class)).when(queue).getMetrics();
+    RMContext rmContext = mock(RMContext.class);
     AppSchedulingInfo  info = new AppSchedulingInfo(
         appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0,
-        new ResourceUsage());
+        new ResourceUsage(), rmContext);
     Assert.assertEquals(0, info.getSchedulerKeys().size());
 
     Priority pri1 = Priority.newInstance(1);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java
new file mode 100644
index 0000000..eb240d1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java
@@ -0,0 +1,752 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueMetrics;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPartitionQueueMetrics {
+
+  static final int GB = 1024; // MB
+  private static final Configuration CONF = new Configuration();
+
+  private MetricsSystem ms;
+
+  @Before
+  public void setUp() {
+    ms = new MetricsSystemImpl();
+    QueueMetrics.clearQueueMetrics();
+    PartitionQueueMetrics.clearQueueMetrics();
+  }
+
+  @After
+  public void tearDown() {
+    ms.shutdown();
+  }
+
+  /**
+   * Structure:
+   * Both queues, q1 & q2 has been configured to run in only 1 partition, x.
+   *
+   * root
+   * / \
+   * q1 q2
+   *
+   * @throws Exception
+   */
+
+  @Test
+  public void testSinglePartitionWithSingleLevelQueueMetrics()
+      throws Exception {
+
+    String parentQueueName = "root";
+    Queue parentQueue = mock(Queue.class);
+    String user = "alice";
+
+    QueueMetrics root = QueueMetrics.forQueue(ms, "root", null, true, CONF);
+    when(parentQueue.getMetrics()).thenReturn(root);
+    when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+    QueueMetrics q1 =
+        QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF);
+    QueueMetrics q2 =
+        QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, CONF);
+
+    q1.submitApp(user);
+    q1.submitAppAttempt(user);
+
+    root.setAvailableResourcesToQueue("x",
+        Resources.createResource(200 * GB, 200));
+    q1.setAvailableResourcesToQueue("x",
+        Resources.createResource(100 * GB, 100));
+
+    q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1));
+
+    MetricsSource partitionSource = partitionSource(ms, "x");
+    MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName);
+    MetricsSource q1Source = queueSource(ms, "x", "root.q1");
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
+
+    q2.incrPendingResources("x", user, 3, Resource.newInstance(1024, 1));
+    MetricsSource q2Source = queueSource(ms, "x", "root.q2");
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
+    checkResources(q2Source, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
+  }
+
+  /**
+   * Structure:
+   * Both queues, q1 & q2 has been configured to run in both partitions, x & y.
+   *
+   * root
+   * / \
+   * q1 q2
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testTwoPartitionWithSingleLevelQueueMetrics() throws Exception {
+
+    String parentQueueName = "root";
+    String user = "alice";
+
+    QueueMetrics root =
+        QueueMetrics.forQueue(ms, parentQueueName, null, false, CONF);
+    Queue parentQueue = mock(Queue.class);
+    when(parentQueue.getMetrics()).thenReturn(root);
+    when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+
+    QueueMetrics q1 =
+        QueueMetrics.forQueue(ms, "root.q1", parentQueue, false, CONF);
+    QueueMetrics q2 =
+        QueueMetrics.forQueue(ms, "root.q2", parentQueue, false, CONF);
+
+    AppSchedulingInfo app = mockApp(user);
+    q1.submitApp(user);
+    q1.submitAppAttempt(user);
+
+    root.setAvailableResourcesToQueue("x",
+        Resources.createResource(200 * GB, 200));
+    q1.setAvailableResourcesToQueue("x",
+        Resources.createResource(100 * GB, 100));
+
+    q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1));
+
+    MetricsSource xPartitionSource = partitionSource(ms, "x");
+    MetricsSource xRootQueueSource = queueSource(ms, "x", parentQueueName);
+    MetricsSource q1Source = queueSource(ms, "x", "root.q1");
+
+    checkResources(xPartitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(xRootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
+
+    root.setAvailableResourcesToQueue("y",
+        Resources.createResource(400 * GB, 400));
+    q2.setAvailableResourcesToQueue("y",
+        Resources.createResource(200 * GB, 200));
+
+    q2.incrPendingResources("y", user, 3, Resource.newInstance(1024, 1));
+
+    MetricsSource yPartitionSource = partitionSource(ms, "y");
+    MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName);
+    MetricsSource q2Source = queueSource(ms, "y", "root.q2");
+
+    checkResources(yPartitionSource, 0, 0, 0, 400 * GB, 400, 3 * GB, 3, 3);
+    checkResources(yRootQueueSource, 0, 0, 0, 400 * GB, 400, 3 * GB, 3, 3);
+    checkResources(q2Source, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3);
+  }
+
+  /**
+   * Structure:
+   * Both queues, q1 has been configured to run in multiple partitions, x & y.
+   *
+   * root
+   * /
+   * q1
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testMultiplePartitionWithSingleQueueMetrics() throws Exception {
+
+    String parentQueueName = "root";
+    Queue parentQueue = mock(Queue.class);
+
+    QueueMetrics root =
+        QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF);
+    when(parentQueue.getMetrics()).thenReturn(root);
+    when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+
+    QueueMetrics q1 =
+        QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF);
+
+    root.setAvailableResourcesToQueue("x",
+        Resources.createResource(200 * GB, 200));
+    root.setAvailableResourcesToQueue("y",
+        Resources.createResource(300 * GB, 300));
+
+    q1.incrPendingResources("x", "test_user", 2, Resource.newInstance(1024, 1));
+
+    MetricsSource partitionSource = partitionSource(ms, "x");
+    MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName);
+    MetricsSource q1Source = queueSource(ms, "x", "root.q1");
+    MetricsSource userSource = userSource(ms, "x", "test_user", "root.q1");
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(q1Source, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
+    checkResources(userSource, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
+
+    q1.incrPendingResources("x", "test_user", 3, Resource.newInstance(1024, 1));
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
+    checkResources(q1Source, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
+    checkResources(userSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
+
+    q1.incrPendingResources("x", "test_user1", 4,
+        Resource.newInstance(1024, 1));
+    MetricsSource userSource1 = userSource(ms, "x", "test_user1", "root.q1");
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 9 * GB, 9, 9);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 9 * GB, 9, 9);
+    checkResources(q1Source, 0, 0, 0, 0, 0, 9 * GB, 9, 9);
+    checkResources(userSource1, 0, 0, 0, 0, 0, 4 * GB, 4, 4);
+
+    q1.incrPendingResources("y", "test_user1", 6,
+        Resource.newInstance(1024, 1));
+    MetricsSource partitionSourceY = partitionSource(ms, "y");
+    MetricsSource rootQueueSourceY = queueSource(ms, "y", parentQueueName);
+    MetricsSource q1SourceY = queueSource(ms, "y", "root.q1");
+    MetricsSource userSourceY = userSource(ms, "y", "test_user1", "root.q1");
+
+    checkResources(partitionSourceY, 0, 0, 0, 300 * GB, 300, 6 * GB, 6, 6);
+    checkResources(rootQueueSourceY, 0, 0, 0, 300 * GB, 300, 6 * GB, 6, 6);
+    checkResources(q1SourceY, 0, 0, 0, 0, 0, 6 * GB, 6, 6);
+    checkResources(userSourceY, 0, 0, 0, 0, 0, 6 * GB, 6, 6);
+  }
+
+  /**
+   * Structure:
+   * Both queues, q1 & q2 has been configured to run in both partitions, x & y.
+   *
+   * root
+   * / \
+   * q1 q2
+   * q1
+   * / \
+   * q11 q12
+   * q2
+   * / \
+   * q21 q22
+   *
+   * @throws Exception
+   */
+
+  @Test
+  public void testMultiplePartitionsWithMultiLevelQueuesMetrics()
+      throws Exception {
+
+    String parentQueueName = "root";
+    Queue parentQueue = mock(Queue.class);
+
+    QueueMetrics root =
+        QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF);
+    when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+    when(parentQueue.getMetrics()).thenReturn(root);
+
+    QueueMetrics q1 =
+        QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF);
+    Queue childQueue1 = mock(Queue.class);
+    when(childQueue1.getQueueName()).thenReturn("root.q1");
+    when(childQueue1.getMetrics()).thenReturn(q1);
+
+    QueueMetrics q11 =
+        QueueMetrics.forQueue(ms, "root.q1.q11", childQueue1, true, CONF);
+    QueueMetrics q12 =
+        QueueMetrics.forQueue(ms, "root.q1.q12", childQueue1, true, CONF);
+
+    QueueMetrics q2 =
+        QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, CONF);
+    Queue childQueue2 = mock(Queue.class);
+    when(childQueue2.getQueueName()).thenReturn("root.q2");
+    when(childQueue2.getMetrics()).thenReturn(q2);
+
+    QueueMetrics q21 =
+        QueueMetrics.forQueue(ms, "root.q2.q21", childQueue2, true, CONF);
+    QueueMetrics q22 =
+        QueueMetrics.forQueue(ms, "root.q2.q22", childQueue2, true, CONF);
+
+    root.setAvailableResourcesToQueue("x",
+        Resources.createResource(200 * GB, 200));
+
+    q1.setAvailableResourcesToQueue("x",
+        Resources.createResource(100 * GB, 100));
+    q11.setAvailableResourcesToQueue("x",
+        Resources.createResource(50 * GB, 50));
+
+    q11.incrPendingResources("x", "test_user", 2,
+        Resource.newInstance(1024, 1));
+
+    MetricsSource partitionSource = partitionSource(ms, "x");
+    MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName);
+    MetricsSource q1Source = queueSource(ms, "x", "root.q1");
+    MetricsSource userSource = userSource(ms, "x", "test_user", "root.q1");
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
+    checkResources(userSource, 0, 0, 0, 0 * GB, 0, 2 * GB, 2, 2);
+
+    q11.incrPendingResources("x", "test_user", 4,
+        Resource.newInstance(1024, 1));
+
+    MetricsSource q11Source = queueSource(ms, "x", "root.q1.q11");
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 6 * GB, 6, 6);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 6 * GB, 6, 6);
+    checkResources(q11Source, 0, 0, 0, 50 * GB, 50, 6 * GB, 6, 6);
+    checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 6 * GB, 6, 6);
+    checkResources(userSource, 0, 0, 0, 0 * GB, 0, 6 * GB, 6, 6);
+
+    q11.incrPendingResources("x", "test_user1", 5,
+        Resource.newInstance(1024, 1));
+
+    MetricsSource q1UserSource1 = userSource(ms, "x", "test_user1", "root.q1");
+    MetricsSource userSource1 =
+        userSource(ms, "x", "test_user1", "root.q1.q11");
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 11 * GB, 11, 11);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 11 * GB, 11, 11);
+    checkResources(q11Source, 0, 0, 0, 50 * GB, 50, 11 * GB, 11, 11);
+    checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 11 * GB, 11, 11);
+    checkResources(userSource, 0, 0, 0, 0 * GB, 0, 6 * GB, 6, 6);
+    checkResources(q1UserSource1, 0, 0, 0, 0 * GB, 0, 5 * GB, 5, 5);
+    checkResources(userSource1, 0, 0, 0, 0 * GB, 0, 5 * GB, 5, 5);
+
+    q12.incrPendingResources("x", "test_user", 5,
+        Resource.newInstance(1024, 1));
+    MetricsSource q12Source = queueSource(ms, "x", "root.q1.q12");
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 16 * GB, 16, 16);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 16 * GB, 16, 16);
+    checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 16 * GB, 16, 16);
+    checkResources(q12Source, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
+
+    root.setAvailableResourcesToQueue("y",
+        Resources.createResource(200 * GB, 200));
+    q1.setAvailableResourcesToQueue("y",
+        Resources.createResource(100 * GB, 100));
+    q12.setAvailableResourcesToQueue("y",
+        Resources.createResource(50 * GB, 50));
+
+    q12.incrPendingResources("y", "test_user", 3,
+        Resource.newInstance(1024, 1));
+
+    MetricsSource yPartitionSource = partitionSource(ms, "y");
+    MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName);
+    MetricsSource q1YSource = queueSource(ms, "y", "root.q1");
+    MetricsSource q12YSource = queueSource(ms, "y", "root.q1.q12");
+
+    checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3);
+    checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3);
+    checkResources(q1YSource, 0, 0, 0, 100 * GB, 100, 3 * GB, 3, 3);
+    checkResources(q12YSource, 0, 0, 0, 50 * GB, 50, 3 * GB, 3, 3);
+
+    root.setAvailableResourcesToQueue("y",
+        Resources.createResource(200 * GB, 200));
+    q2.setAvailableResourcesToQueue("y",
+        Resources.createResource(100 * GB, 100));
+    q21.setAvailableResourcesToQueue("y",
+        Resources.createResource(50 * GB, 50));
+
+    q21.incrPendingResources("y", "test_user", 5,
+        Resource.newInstance(1024, 1));
+    MetricsSource q21Source = queueSource(ms, "y", "root.q2.q21");
+    MetricsSource q2YSource = queueSource(ms, "y", "root.q2");
+
+    checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 8 * GB, 8, 8);
+    checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 8 * GB, 8, 8);
+    checkResources(q2YSource, 0, 0, 0, 100 * GB, 100, 5 * GB, 5, 5);
+    checkResources(q21Source, 0, 0, 0, 50 * GB, 50, 5 * GB, 5, 5);
+
+    q22.incrPendingResources("y", "test_user", 6,
+        Resource.newInstance(1024, 1));
+    MetricsSource q22Source = queueSource(ms, "y", "root.q2.q22");
+
+    checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 14 * GB, 14, 14);
+    checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 14 * GB, 14, 14);
+    checkResources(q22Source, 0, 0, 0, 0, 0, 6 * GB, 6, 6);
+  }
+
+  @Test
+  public void testTwoLevelWithUserMetrics() {
+    String parentQueueName = "root";
+    String leafQueueName = "root.leaf";
+    String user = "alice";
+    String partition = "x";
+
+    QueueMetrics parentMetrics =
+        QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF);
+    Queue parentQueue = mock(Queue.class);
+    when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+    when(parentQueue.getMetrics()).thenReturn(parentMetrics);
+    QueueMetrics metrics =
+        QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, CONF);
+    AppSchedulingInfo app = mockApp(user);
+
+    metrics.submitApp(user);
+    metrics.submitAppAttempt(user);
+
+    parentMetrics.setAvailableResourcesToQueue(partition,
+        Resources.createResource(100 * GB, 100));
+    metrics.setAvailableResourcesToQueue(partition,
+        Resources.createResource(100 * GB, 100));
+    parentMetrics.setAvailableResourcesToUser(partition, user,
+        Resources.createResource(10 * GB, 10));
+    metrics.setAvailableResourcesToUser(partition, user,
+        Resources.createResource(10 * GB, 10));
+    metrics.incrPendingResources(partition, user, 6,
+        Resources.createResource(3 * GB, 3));
+
+    MetricsSource partitionSource = partitionSource(ms, partition);
+    MetricsSource parentQueueSource =
+        queueSource(ms, partition, parentQueueName);
+    MetricsSource queueSource = queueSource(ms, partition, leafQueueName);
+    MetricsSource userSource = userSource(ms, partition, user, leafQueueName);
+    MetricsSource userSource1 =
+        userSource(ms, partition, user, parentQueueName);
+
+    checkResources(queueSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18, 6, 0,
+        0, 0);
+    checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18,
+        6, 0, 0, 0);
+    checkResources(userSource, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0, 0,
+        0);
+    checkResources(userSource1, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0,
+        0, 0);
+    checkResources(partitionSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18,
+        6, 0, 0, 0);
+
+    metrics.runAppAttempt(app.getApplicationId(), user);
+
+    metrics.allocateResources(partition, user, 3,
+        Resources.createResource(1 * GB, 1), true);
+    metrics.reserveResource(partition, user,
+        Resources.createResource(3 * GB, 3));
+
+    // Available resources is set externally, as it depends on dynamic
+    // configurable cluster/queue resources
+    checkResources(queueSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100, 15 * GB, 15,
+        3, 3 * GB, 3, 1);
+    checkResources(parentQueueSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100,
+        15 * GB, 15, 3, 3 * GB, 3, 1);
+    checkResources(partitionSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100, 15 * GB,
+        15, 3, 3 * GB, 3, 1);
+    checkResources(userSource, 3 * GB, 3, 3, 3, 0, 10 * GB, 10, 15 * GB, 15, 3,
+        3 * GB, 3, 1);
+    checkResources(userSource1, 3 * GB, 3, 3, 3, 0, 10 * GB, 10, 15 * GB, 15, 3,
+        3 * GB, 3, 1);
+
+    metrics.allocateResources(partition, user, 3,
+        Resources.createResource(1 * GB, 1), true);
+
+    checkResources(queueSource, 6 * GB, 6, 6, 6, 0, 100 * GB, 100, 12 * GB, 12,
+        0, 3 * GB, 3, 1);
+    checkResources(parentQueueSource, 6 * GB, 6, 6, 6, 0, 100 * GB, 100,
+        12 * GB, 12, 0, 3 * GB, 3, 1);
+
+    metrics.releaseResources(partition, user, 1,
+        Resources.createResource(2 * GB, 2));
+    metrics.unreserveResource(partition, user,
+        Resources.createResource(3 * GB, 3));
+    checkResources(queueSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100, 12 * GB, 12,
+        0, 0, 0, 0);
+    checkResources(parentQueueSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100,
+        12 * GB, 12, 0, 0, 0, 0);
+    checkResources(partitionSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100, 12 * GB,
+        12, 0, 0, 0, 0);
+    checkResources(userSource, 4 * GB, 4, 5, 6, 1, 10 * GB, 10, 12 * GB, 12, 0,
+        0, 0, 0);
+    checkResources(userSource1, 4 * GB, 4, 5, 6, 1, 10 * GB, 10, 12 * GB, 12, 0,
+        0, 0, 0);
+
+    metrics.finishAppAttempt(app.getApplicationId(), app.isPending(),
+        app.getUser());
+
+    metrics.finishApp(user, RMAppState.FINISHED);
+  }
+
+  @Test
+  public void testThreeLevelWithUserMetrics() {
+    String parentQueueName = "root";
+    String leafQueueName = "root.leaf";
+    String leafQueueName1 = "root.leaf.leaf1";
+    String user = "alice";
+    String partitionX = "x";
+    String partitionY = "y";
+
+    QueueMetrics parentMetrics =
+        QueueMetrics.forQueue(parentQueueName, null, true, CONF);
+    Queue parentQueue = mock(Queue.class);
+    when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+    when(parentQueue.getMetrics()).thenReturn(parentMetrics);
+    QueueMetrics metrics =
+        QueueMetrics.forQueue(leafQueueName, parentQueue, true, CONF);
+    Queue leafQueue = mock(Queue.class);
+    when(leafQueue.getQueueName()).thenReturn(leafQueueName);
+    when(leafQueue.getMetrics()).thenReturn(metrics);
+    QueueMetrics metrics1 =
+        QueueMetrics.forQueue(leafQueueName1, leafQueue, true, CONF);
+    AppSchedulingInfo app = mockApp(user);
+
+    metrics1.submitApp(user);
+    metrics1.submitAppAttempt(user);
+
+    parentMetrics.setAvailableResourcesToQueue(partitionX,
+        Resources.createResource(200 * GB, 200));
+    parentMetrics.setAvailableResourcesToQueue(partitionY,
+        Resources.createResource(500 * GB, 500));
+    metrics.setAvailableResourcesToQueue(partitionX,
+        Resources.createResource(100 * GB, 100));
+    metrics.setAvailableResourcesToQueue(partitionY,
+        Resources.createResource(400 * GB, 400));
+    metrics1.setAvailableResourcesToQueue(partitionX,
+        Resources.createResource(50 * GB, 50));
+    metrics1.setAvailableResourcesToQueue(partitionY,
+        Resources.createResource(300 * GB, 300));
+    parentMetrics.setAvailableResourcesToUser(partitionX, user,
+        Resources.createResource(20 * GB, 20));
+    parentMetrics.setAvailableResourcesToUser(partitionY, user,
+        Resources.createResource(50 * GB, 50));
+    metrics.setAvailableResourcesToUser(partitionX, user,
+        Resources.createResource(10 * GB, 10));
+    metrics.setAvailableResourcesToUser(partitionY, user,
+        Resources.createResource(40 * GB, 40));
+    metrics1.setAvailableResourcesToUser(partitionX, user,
+        Resources.createResource(5 * GB, 5));
+    metrics1.setAvailableResourcesToUser(partitionY, user,
+        Resources.createResource(30 * GB, 30));
+    metrics1.incrPendingResources(partitionX, user, 6,
+        Resources.createResource(3 * GB, 3));
+    metrics1.incrPendingResources(partitionY, user, 6,
+        Resources.createResource(4 * GB, 4));
+
+    MetricsSource partitionSourceX =
+        partitionSource(metrics1.getMetricsSystem(), partitionX);
+
+    MetricsSource parentQueueSourceWithPartX =
+        queueSource(metrics1.getMetricsSystem(), partitionX, parentQueueName);
+    MetricsSource queueSourceWithPartX =
+        queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName);
+    MetricsSource queueSource1WithPartX =
+        queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName1);
+    MetricsSource parentUserSourceWithPartX = userSource(metrics1.getMetricsSystem(),
+        partitionX, user, parentQueueName);
+    MetricsSource userSourceWithPartX = userSource(metrics1.getMetricsSystem(),
+        partitionX, user, leafQueueName);
+    MetricsSource userSource1WithPartX = userSource(metrics1.getMetricsSystem(),
+        partitionX, user, leafQueueName1);
+
+    checkResources(partitionSourceX, 0, 0, 0, 0, 0, 200 * GB, 200, 18 * GB, 18,
+        6, 0, 0, 0);
+    checkResources(parentQueueSourceWithPartX, 0, 0, 0, 0, 0, 200 * GB, 200, 18 * GB,
+        18, 6, 0, 0, 0);
+
+    checkResources(queueSourceWithPartX, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18, 6,
+        0, 0, 0);
+    checkResources(queueSource1WithPartX, 0, 0, 0, 0, 0, 50 * GB, 50, 18 * GB, 18, 6,
+        0, 0, 0);
+    checkResources(parentUserSourceWithPartX, 0, 0, 0, 0, 0, 20 * GB, 20, 18 * GB, 18,
+        6, 0, 0, 0);
+    checkResources(userSourceWithPartX, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0,
+        0, 0);
+    checkResources(userSource1WithPartX, 0, 0, 0, 0, 0, 5 * GB, 5, 18 * GB, 18, 6, 0,
+        0, 0);
+
+    MetricsSource partitionSourceY =
+        partitionSource(metrics1.getMetricsSystem(), partitionY);
+
+    MetricsSource parentQueueSourceWithPartY =
+        queueSource(metrics1.getMetricsSystem(), partitionY, parentQueueName);
+    MetricsSource queueSourceWithPartY =
+        queueSource(metrics1.getMetricsSystem(), partitionY, leafQueueName);
+    MetricsSource queueSource1WithPartY =
+        queueSource(metrics1.getMetricsSystem(), partitionY, leafQueueName1);
+    MetricsSource parentUserSourceWithPartY = userSource(metrics1.getMetricsSystem(),
+        partitionY, user, parentQueueName);
+    MetricsSource userSourceWithPartY = userSource(metrics1.getMetricsSystem(),
+        partitionY, user, leafQueueName);
+    MetricsSource userSource1WithPartY = userSource(metrics1.getMetricsSystem(),
+        partitionY, user, leafQueueName1);
+
+    checkResources(partitionSourceY, 0, 0, 0, 0, 0, 500 * GB, 500, 24 * GB, 24,
+        6, 0, 0, 0);
+    checkResources(parentQueueSourceWithPartY, 0, 0, 0, 0, 0, 500 * GB, 500, 24 * GB,
+        24, 6, 0, 0, 0);
+    checkResources(queueSourceWithPartY, 0, 0, 0, 0, 0, 400 * GB, 400, 24 * GB, 24, 6,
+        0, 0, 0);
+    checkResources(queueSource1WithPartY, 0, 0, 0, 0, 0, 300 * GB, 300, 24 * GB, 24, 6,
+        0, 0, 0);
+    checkResources(parentUserSourceWithPartY, 0, 0, 0, 0, 0, 50 * GB, 50, 24 * GB, 24,
+        6, 0, 0, 0);
+    checkResources(userSourceWithPartY, 0, 0, 0, 0, 0, 40 * GB, 40, 24 * GB, 24, 6, 0,
+        0, 0);
+    checkResources(userSource1WithPartY, 0, 0, 0, 0, 0, 30 * GB, 30, 24 * GB, 24, 6, 0,
+        0, 0);
+
+    metrics1.finishAppAttempt(app.getApplicationId(), app.isPending(),
+        app.getUser());
+
+    metrics1.finishApp(user, RMAppState.FINISHED);
+  }
+
+  /**
+   * Structure:
+   * Both queues, q1 & q2 has been configured to run in only 1 partition, x
+   * UserMetrics has been disabled, hence trying to access the user source
+   * throws NPE from sources.
+   *
+   * root
+   * / \
+   * q1 q2
+   *
+   * @throws Exception
+   */
+  @Test(expected = NullPointerException.class)
+  public void testSinglePartitionWithSingleLevelQueueMetricsWithoutUserMetrics()
+      throws Exception {
+
+    String parentQueueName = "root";
+    Queue parentQueue = mock(Queue.class);
+    String user = "alice";
+
+    QueueMetrics root = QueueMetrics.forQueue("root", null, false, CONF);
+    when(parentQueue.getMetrics()).thenReturn(root);
+    when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+    CSQueueMetrics q1 =
+        CSQueueMetrics.forQueue("root.q1", parentQueue, false, CONF);
+    CSQueueMetrics q2 =
+        CSQueueMetrics.forQueue("root.q2", parentQueue, false, CONF);
+
+    AppSchedulingInfo app = mockApp(user);
+
+    q1.submitApp(user);
+    q1.submitAppAttempt(user);
+
+    root.setAvailableResourcesToQueue("x",
+        Resources.createResource(200 * GB, 200));
+
+    q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1));
+
+    MetricsSource partitionSource = partitionSource(q1.getMetricsSystem(), "x");
+    MetricsSource rootQueueSource =
+        queueSource(q1.getMetricsSystem(), "x", parentQueueName);
+    MetricsSource q1Source = queueSource(q1.getMetricsSystem(), "x", "root.q1");
+    MetricsSource q1UserSource =
+        userSource(q1.getMetricsSystem(), "x", user, "root.q1");
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(q1Source, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
+    checkResources(q1UserSource, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
+
+    q2.incrPendingResources("x", user, 3, Resource.newInstance(1024, 1));
+    MetricsSource q2Source = queueSource(q2.getMetricsSystem(), "x", "root.q2");
+    MetricsSource q2UserSource =
+        userSource(q1.getMetricsSystem(), "x", user, "root.q2");
+
+    checkResources(partitionSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
+    checkResources(rootQueueSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
+    checkResources(q2Source, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
+    checkResources(q2UserSource, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
+
+    q1.finishAppAttempt(app.getApplicationId(), app.isPending(), app.getUser());
+    q1.finishApp(user, RMAppState.FINISHED);
+  }
+
+  public static MetricsSource partitionSource(MetricsSystem ms,
+      String partition) {
+    MetricsSource s =
+        ms.getSource(QueueMetrics.pSourceName(partition).toString());
+    return s;
+  }
+
+  public static MetricsSource queueSource(MetricsSystem ms, String partition,
+      String queue) {
+    MetricsSource s = ms.getSource(QueueMetrics.pSourceName(partition)
+        .append(QueueMetrics.qSourceName(queue)).toString());
+    return s;
+  }
+
+  public static MetricsSource userSource(MetricsSystem ms, String partition,
+      String user, String queue) {
+    MetricsSource s = ms.getSource(QueueMetrics.pSourceName(partition)
+        .append(QueueMetrics.qSourceName(queue)).append(",user=")
+        .append(user).toString());
+    return s;
+  }
+
+  public static void checkResources(MetricsSource source, long allocatedMB,
+      int allocatedCores, int allocCtnrs, long availableMB, int availableCores,
+      long pendingMB, int pendingCores, int pendingCtnrs) {
+    MetricsRecordBuilder rb = getMetrics(source);
+    assertGauge("AllocatedMB", allocatedMB, rb);
+    assertGauge("AllocatedVCores", allocatedCores, rb);
+    assertGauge("AllocatedContainers", allocCtnrs, rb);
+    assertGauge("AvailableMB", availableMB, rb);
+    assertGauge("AvailableVCores", availableCores, rb);
+    assertGauge("PendingMB", pendingMB, rb);
+    assertGauge("PendingVCores", pendingCores, rb);
+    assertGauge("PendingContainers", pendingCtnrs, rb);
+  }
+
+  private static AppSchedulingInfo mockApp(String user) {
+    AppSchedulingInfo app = mock(AppSchedulingInfo.class);
+    when(app.getUser()).thenReturn(user);
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+    ApplicationAttemptId id = BuilderUtils.newApplicationAttemptId(appId, 1);
+    when(app.getApplicationAttemptId()).thenReturn(id);
+    return app;
+  }
+
+  public static void checkResources(MetricsSource source, long allocatedMB,
+      int allocatedCores, int allocCtnrs, long aggreAllocCtnrs,
+      long aggreReleasedCtnrs, long availableMB, int availableCores,
+      long pendingMB, int pendingCores, int pendingCtnrs, long reservedMB,
+      int reservedCores, int reservedCtnrs) {
+    MetricsRecordBuilder rb = getMetrics(source);
+    assertGauge("AllocatedMB", allocatedMB, rb);
+    assertGauge("AllocatedVCores", allocatedCores, rb);
+    assertGauge("AllocatedContainers", allocCtnrs, rb);
+    assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb);
+    assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb);
+    assertGauge("AvailableMB", availableMB, rb);
+    assertGauge("AvailableVCores", availableCores, rb);
+    assertGauge("PendingMB", pendingMB, rb);
+    assertGauge("PendingVCores", pendingCores, rb);
+    assertGauge("PendingContainers", pendingCtnrs, rb);
+    assertGauge("ReservedMB", reservedMB, rb);
+    assertGauge("ReservedVCores", reservedCores, rb);
+    assertGauge("ReservedContainers", reservedCtnrs, rb);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
index c110b1c..25062b9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
@@ -93,7 +93,7 @@ public class TestSchedulerApplicationAttempt {
     app.liveContainers.put(container1.getContainerId(), container1);
     SchedulerNode node = createNode();
     app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node,
-        toSchedulerKey(requestedPriority), container1.getContainer());
+        toSchedulerKey(requestedPriority), container1);
 
     // Active user count has to decrease from queue2 due to app has NO pending requests
     assertEquals(0, queue2.getAbstractUsersManager().getNumActiveUsers());
@@ -135,7 +135,7 @@ public class TestSchedulerApplicationAttempt {
     app.liveContainers.put(container1.getContainerId(), container1);
     SchedulerNode node = createNode();
     app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node,
-        toSchedulerKey(requestedPriority), container1.getContainer());
+        toSchedulerKey(requestedPriority), container1);
     
     // Reserved container
     Priority prio1 = Priority.newInstance(1);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 3b76fb2..107428c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -1181,9 +1181,9 @@ public class TestLeafQueue {
     qb.finishApplication(app_0.getApplicationId(), user_0);
     qb.finishApplication(app_2.getApplicationId(), user_1);
     qb.releaseResource(clusterResource, app_0, Resource.newInstance(4*GB, 1),
-        null, null);
+        "", null);
     qb.releaseResource(clusterResource, app_2, Resource.newInstance(4*GB, 1),
-        null, null);
+        "", null);
 
     qb.setUserLimit(50);
     qb.setUserLimitFactor(1);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
index 54b5047..636cb99 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -25,9 +25,11 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -47,11 +49,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestPartitionQueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -2032,10 +2037,9 @@ public class TestNodeLabelContainerAllocation {
     assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB());
     assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB());
 
-    // The total memory tracked by QueueMetrics is 0GB for the default partition
     CSQueue rootQueue = cs.getRootQueue();
-    assertEquals(0*GB, rootQueue.getMetrics().getAvailableMB() +
-        rootQueue.getMetrics().getAllocatedMB());
+    assertEquals(0 * GB, rootQueue.getMetrics().getAvailableMB()
+        + rootQueue.getMetrics().getAllocatedMB());
 
     // Kill all apps in queue a
     cs.killAllAppsInQueue("a");
@@ -2084,6 +2088,8 @@ public class TestNodeLabelContainerAllocation {
     csConf.setCapacityByLabel(queueB, "x", 50);
     csConf.setMaximumCapacityByLabel(queueB, "x", 50);
 
+    csConf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+
     // set node -> label
     mgr.addToCluserNodeLabels(
         ImmutableSet.of(NodeLabel.newInstance("x", false)));
@@ -2102,6 +2108,54 @@ public class TestNodeLabelContainerAllocation {
     rm1.start();
     MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
     MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <no_label>
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+    SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+    double delta = 0.0001;
+    CSQueue leafQueue = cs.getQueue("a");
+    CSQueue leafQueueB = cs.getQueue("b");
+    CSQueue rootQueue = cs.getRootQueue();
+    assertEquals(10 * GB, rootQueue.getMetrics().getAvailableMB(), delta);
+    assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
+    assertEquals(7.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta);
+
+    MetricsSystem ms = leafQueueB.getMetrics().getMetricsSystem();
+    QueueMetrics partXMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "x");
+    QueueMetrics partDefaultMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "");
+    QueueMetrics queueAMetrics =
+        (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a");
+    QueueMetrics queueBMetrics =
+        (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.b");
+    QueueMetrics queueAPartDefaultMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a");
+    QueueMetrics queueAPartXMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a");
+    QueueMetrics queueBPartDefaultMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.b");
+    QueueMetrics queueBPartXMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.b");
+    QueueMetrics rootMetrics =
+        (QueueMetrics) TestQueueMetrics.queueSource(ms, "root");
+    assertEquals(10 * GB, partXMetrics.getAvailableMB(), delta);
+    assertEquals(10 * GB, partDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta);
+    assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta);
+    assertEquals(10 * GB, rootMetrics.getAvailableMB(), delta);
+    assertEquals(2.5 * GB, queueAMetrics.getAvailableMB(), delta);
+    assertEquals(7.5 * GB, queueBMetrics.getAvailableMB(), delta);
+
     // app1 -> a
     RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
@@ -2109,47 +2163,73 @@ public class TestNodeLabelContainerAllocation {
     // app1 asks for 3 partition= containers
     am1.allocate("*", 1 * GB, 3, new ArrayList<ContainerId>());
 
-    // NM1 do 50 heartbeats
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-
-    SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
     for (int i = 0; i < 50; i++) {
       cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
     }
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
 
     // app1 gets all resource in partition=x (non-exclusive)
     Assert.assertEquals(3, schedulerNode1.getNumContainers());
-
     SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
         .getNodeReport(nm1.getNodeId());
     Assert.assertEquals(3 * GB, reportNm1.getUsedResource().getMemorySize());
     Assert.assertEquals(7 * GB,
         reportNm1.getAvailableResource().getMemorySize());
-
     SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
         .getNodeReport(nm2.getNodeId());
     Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize());
     Assert.assertEquals(9 * GB,
         reportNm2.getAvailableResource().getMemorySize());
-
-    LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
-    // 3GB is used from label x quota. 1.5 GB is remaining from default label.
-    // 2GB is remaining from label x.
-    assertEquals(15 * GB / 10, leafQueue.getMetrics().getAvailableMB());
+    assertEquals(7 * GB, partXMetrics.getAvailableMB(), delta);
+    assertEquals(9 * GB, partDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
+    assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta);
+    assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
+    assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta);
+    assertEquals(1 * GB, queueAMetrics.getAllocatedMB(), delta);
+    assertEquals(1.5 * GB, queueAMetrics.getAvailableMB(), delta);
+    assertEquals(0 * GB, queueBMetrics.getAllocatedMB(), delta);
+    assertEquals(7.5 * GB, queueBMetrics.getAvailableMB(), delta);
+    assertEquals(0 * GB, queueAMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, queueAPartDefaultMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, queueBPartDefaultMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, queueBPartXMetrics.getPendingMB(), delta);
+    assertEquals(1.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
     assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB());
+    assertEquals(3 * GB, partXMetrics.getAllocatedMB(), delta);
+    assertEquals(1 * GB, partDefaultMetrics.getAllocatedMB(), delta);
+
+    QueueMetrics partDefaultQueueAUserMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.userSource(ms, "", "user",
+            "root.a");
+    QueueMetrics partXQueueAUserMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.userSource(ms, "x", "user",
+            "root.a");
+    QueueMetrics queueAUserMetrics =
+        (QueueMetrics) TestQueueMetrics.userSource(ms, "root.a", "user");
+
+    assertEquals(2 * GB, queueAUserMetrics.getAvailableMB(), delta);
+    assertEquals(1 * GB, queueAUserMetrics.getAllocatedMB(), delta);
+    assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
+    assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta);
+    assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
+    assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta);
+    assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta);
+    assertEquals(2 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
+    assertEquals(3 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta);
 
-    // app1 asks for 1 default partition container
     am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
 
-    // NM2 do couple of heartbeats
-    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
-    SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
     cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
 
-    // app1 gets all resource in default partition
     Assert.assertEquals(2, schedulerNode2.getNumContainers());
+    Assert.assertEquals(3, schedulerNode1.getNumContainers());
 
     // 3GB is used from label x quota. 2GB used from default label.
     // So 0.5 GB is remaining from default label.
@@ -2158,10 +2238,100 @@ public class TestNodeLabelContainerAllocation {
 
     // The total memory tracked by QueueMetrics is 10GB
     // for the default partition
-    CSQueue rootQueue = cs.getRootQueue();
     assertEquals(10*GB, rootQueue.getMetrics().getAvailableMB() +
         rootQueue.getMetrics().getAllocatedMB());
 
+    assertEquals(0.5 * GB, queueAMetrics.getAvailableMB(), delta);
+    assertEquals(2 * GB, queueAMetrics.getAllocatedMB());
+    assertEquals(0.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(2 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
+    assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta);
+    assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
+    assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAvailableMB(),
+      delta);
+    assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta);
+    assertEquals(2 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
+    assertEquals(3 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta);
+    assertEquals(1 * GB, queueAUserMetrics.getAvailableMB(), delta);
+    assertEquals(2 * GB, queueAUserMetrics.getAllocatedMB(), delta);
+    assertEquals(7 * GB, partXMetrics.getAvailableMB(), delta);
+    assertEquals(3 * GB, partXMetrics.getAllocatedMB(), delta);
+    assertEquals(8 * GB, partDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(2 * GB, partDefaultMetrics.getAllocatedMB(), delta);
+
+    // Pending Resources when containers are waiting on "default" partition
+    assertEquals(4 * GB, queueAMetrics.getPendingMB(), delta);
+    assertEquals(4 * GB, queueAPartDefaultMetrics.getPendingMB(), delta);
+    assertEquals(4 * GB, partDefaultQueueAUserMetrics.getPendingMB(),
+        delta);
+    assertEquals(4 * GB, queueAUserMetrics.getPendingMB(), delta);
+    assertEquals(4 * GB, partDefaultMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, partXQueueAUserMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, partXMetrics.getPendingMB(), delta);
+
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    assertEquals(0.5 * GB, queueAMetrics.getAvailableMB(), delta);
+    assertEquals(2 * GB, queueAMetrics.getAllocatedMB());
+    assertEquals(0.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(2 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
+    assertEquals(0 * GB, queueAPartXMetrics.getAvailableMB(), delta);
+    assertEquals(7 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
+    assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAvailableMB(),
+      delta);
+    assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta);
+    assertEquals(0 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
+    assertEquals(7 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta);
+    assertEquals(1 * GB, queueAUserMetrics.getAvailableMB(), delta);
+    assertEquals(2 * GB, queueAUserMetrics.getAllocatedMB(), delta);
+    assertEquals(3 * GB, partXMetrics.getAvailableMB(), delta);
+    assertEquals(7 * GB, partXMetrics.getAllocatedMB(), delta);
+    assertEquals(8 * GB, partDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(2 * GB, partDefaultMetrics.getAllocatedMB(), delta);
+
+    // Pending Resources after containers has been assigned on "x" partition
+    assertEquals(0 * GB, queueAMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, queueAPartDefaultMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, partDefaultQueueAUserMetrics.getPendingMB(),
+        delta);
+    assertEquals(0 * GB, queueAUserMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, partDefaultMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, partXQueueAUserMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, partXMetrics.getPendingMB(), delta);
+
+    rm1.killApp(app1.getApplicationId());
+    rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    assertEquals(10 * GB, rootQueue.getMetrics().getAvailableMB(), delta);
+    assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
+    assertEquals(7.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta);
+    assertEquals(2, queueAMetrics.getAggregateAllocatedContainers());
+    assertEquals(2, queueAMetrics.getAggegatedReleasedContainers());
+    assertEquals(2, queueAPartDefaultMetrics.getAggregateAllocatedContainers());
+    assertEquals(2, queueAPartDefaultMetrics.getAggegatedReleasedContainers());
+    assertEquals(7, partXMetrics.getAggregateAllocatedContainers());
+    assertEquals(2, partDefaultMetrics.getAggregateAllocatedContainers());
+    assertEquals(7, queueAPartXMetrics.getAggregateAllocatedContainers());
+    assertEquals(7, queueAPartXMetrics.getAggegatedReleasedContainers());
+    assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta);
+    assertEquals(3 * GB, queueAUserMetrics.getAvailableMB(), delta);
+    assertEquals(3 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta);
+    assertEquals(5 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
     rm1.close();
   }
 
@@ -2278,8 +2448,8 @@ public class TestNodeLabelContainerAllocation {
     // The total memory tracked by QueueMetrics is 12GB
     // for the default partition
     CSQueue rootQueue = cs.getRootQueue();
-    assertEquals(12*GB, rootQueue.getMetrics().getAvailableMB() +
-        rootQueue.getMetrics().getAllocatedMB());
+    assertEquals(12 * GB, rootQueue.getMetrics().getAvailableMB()
+        + rootQueue.getMetrics().getAllocatedMB());
 
     // Kill all apps in queue a
     cs.killAllAppsInQueue("a");
@@ -2292,6 +2462,193 @@ public class TestNodeLabelContainerAllocation {
   }
 
   @Test
+  public void testTwoLevelQueueMetricsWithLabels() throws Exception {
+
+    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
+        this.conf);
+
+    // Define top-level queues
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[] {"a"});
+    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+    final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
+    csConf.setCapacity(queueA, 100);
+    csConf.setAccessibleNodeLabels(queueA, toSet("x"));
+    csConf.setCapacityByLabel(queueA, "x", 100);
+    csConf.setMaximumCapacityByLabel(queueA, "x", 100);
+
+    csConf.setQueues(queueA, new String[] {"a1"});
+    final String queueA1 = queueA + ".a1";
+    csConf.setCapacity(queueA1, 100);
+
+    csConf.setAccessibleNodeLabels(queueA1, toSet("x"));
+    csConf.setCapacityByLabel(queueA1, "x", 100);
+    csConf.setMaximumCapacityByLabel(queueA1, "x", 100);
+
+    // set node -> label
+    // label x exclusivity is set to true
+    mgr.addToCluserNodeLabels(
+        ImmutableSet.of(NodeLabel.newInstance("x", true)));
+    mgr.addLabelsToNode(
+        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(csConf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
+    MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB); // label = <no_label>
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    ParentQueue leafQueueA = (ParentQueue) cs.getQueue("a");
+    LeafQueue leafQueueA1 = (LeafQueue) cs.getQueue("a1");
+    assertEquals(12 * GB, leafQueueA1.getMetrics().getAvailableMB());
+    assertEquals(0 * GB, leafQueueA1.getMetrics().getAllocatedMB());
+    MetricsSystem ms = leafQueueA1.getMetrics().getMetricsSystem();
+    QueueMetrics partXMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "x");
+    QueueMetrics partDefaultMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "");
+    QueueMetrics queueAPartDefaultMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a");
+    QueueMetrics queueAPartXMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a");
+    QueueMetrics queueA1PartDefaultMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a.a1");
+    QueueMetrics queueA1PartXMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a.a1");
+    QueueMetrics queueRootPartDefaultMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root");
+    QueueMetrics queueRootPartXMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root");
+    QueueMetrics queueAMetrics =
+        (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a");
+    QueueMetrics queueA1Metrics =
+        (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a.a1");
+    QueueMetrics queueRootMetrics =
+        (QueueMetrics) TestQueueMetrics.queueSource(ms, "root");
+    assertEquals(12 * GB, queueAMetrics.getAvailableMB());
+    assertEquals(12 * GB, queueA1Metrics.getAvailableMB());
+    assertEquals(12 * GB, queueRootMetrics.getAvailableMB());
+    assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB());
+    assertEquals(10 * GB, queueA1PartXMetrics.getAvailableMB());
+    assertEquals(10 * GB, queueAPartXMetrics.getAvailableMB());
+    assertEquals(10 * GB, queueRootPartXMetrics.getAvailableMB());
+    assertEquals(12 * GB, queueA1PartDefaultMetrics.getAvailableMB());
+    assertEquals(12 * GB, queueAPartDefaultMetrics.getAvailableMB());
+    assertEquals(12 * GB, queueRootPartDefaultMetrics.getAvailableMB());
+    assertEquals(10 * GB, partXMetrics.getAvailableMB());
+    assertEquals(12 * GB, partDefaultMetrics.getAvailableMB());
+
+    // app1 -> a
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "x");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // app1 asks for 5 partition=x containers
+    am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "x");
+    // NM1 do 50 heartbeats
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
+
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+
+    // app1 gets all resource in partition=x
+    Assert.assertEquals(6, schedulerNode1.getNumContainers());
+
+    SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
+        .getNodeReport(nm1.getNodeId());
+    Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
+    Assert.assertEquals(4 * GB, reportNm1.getAvailableResource().getMemorySize());
+
+    SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
+        .getNodeReport(nm2.getNodeId());
+    Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize());
+    Assert.assertEquals(12 * GB,
+        reportNm2.getAvailableResource().getMemorySize());
+
+    assertEquals(0 * GB, queueAMetrics.getAllocatedMB());
+    assertEquals(0 * GB, queueA1Metrics.getAllocatedMB());
+    assertEquals(0 * GB, queueRootMetrics.getAllocatedMB());
+    assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
+    assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
+    assertEquals(6 * GB, queueA1PartXMetrics.getAllocatedMB());
+    assertEquals(6 * GB, queueAPartXMetrics.getAllocatedMB());
+    assertEquals(6 * GB, queueRootPartXMetrics.getAllocatedMB());
+    assertEquals(0 * GB, queueA1PartDefaultMetrics.getAllocatedMB());
+    assertEquals(0 * GB, queueAPartDefaultMetrics.getAllocatedMB());
+    assertEquals(0 * GB, queueRootPartDefaultMetrics.getAllocatedMB());
+    assertEquals(6 * GB, partXMetrics.getAllocatedMB());
+    assertEquals(0 * GB, partDefaultMetrics.getAllocatedMB());
+
+    // app2 -> a
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // app2 asks for 5 partition= containers
+    am2.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "");
+    // NM2 do 50 heartbeats
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
+
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    // app1 gets all resource in partition=x
+    Assert.assertEquals(6, schedulerNode2.getNumContainers());
+
+    reportNm1 = rm1.getResourceScheduler().getNodeReport(nm1.getNodeId());
+    Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
+    Assert.assertEquals(4 * GB,
+        reportNm1.getAvailableResource().getMemorySize());
+
+    reportNm2 = rm1.getResourceScheduler().getNodeReport(nm2.getNodeId());
+    Assert.assertEquals(6 * GB, reportNm2.getUsedResource().getMemorySize());
+    Assert.assertEquals(6 * GB,
+        reportNm2.getAvailableResource().getMemorySize());
+
+    assertEquals(6 * GB, leafQueueA.getMetrics().getAvailableMB());
+    assertEquals(6 * GB, leafQueueA.getMetrics().getAllocatedMB());
+
+    // The total memory tracked by QueueMetrics is 12GB
+    // for the default partition
+    CSQueue rootQueue = cs.getRootQueue();
+    assertEquals(12 * GB, rootQueue.getMetrics().getAvailableMB()
+        + rootQueue.getMetrics().getAllocatedMB());
+
+    assertEquals(6 * GB, queueAMetrics.getAllocatedMB());
+    assertEquals(6 * GB, queueA1Metrics.getAllocatedMB());
+    assertEquals(6 * GB, queueRootMetrics.getAllocatedMB());
+    assertEquals(6 * GB, queueA1PartXMetrics.getAllocatedMB());
+    assertEquals(6 * GB, queueAPartXMetrics.getAllocatedMB());
+    assertEquals(6 * GB, queueRootPartXMetrics.getAllocatedMB());
+    assertEquals(6 * GB, queueA1PartDefaultMetrics.getAllocatedMB());
+    assertEquals(6 * GB, queueAPartDefaultMetrics.getAllocatedMB());
+    assertEquals(6 * GB, queueRootPartDefaultMetrics.getAllocatedMB());
+    assertEquals(6 * GB, partXMetrics.getAllocatedMB());
+    assertEquals(6 * GB, partDefaultMetrics.getAllocatedMB());
+
+    // Kill all apps in queue a
+    cs.killAllAppsInQueue("a1");
+    rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+    rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());
+    assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB());
+    assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores());
+    rm1.close();
+  }
+
+  @Test
   public void testQueueMetricsWithLabelsDisableElasticity() throws Exception {
     /**
      * Test case: have a following queue structure:


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org