You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2019/10/15 22:29:08 UTC

[hadoop] branch branch-3.1 updated: YARN-8842. Expose metrics for custom resource types in QueueMetrics. (Contributed by Szilard Nemeth)

This is an automated email from the ASF dual-hosted git repository.

epayne pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new a70c6e9  YARN-8842. Expose metrics for custom resource types in QueueMetrics. (Contributed by Szilard Nemeth)
a70c6e9 is described below

commit a70c6e96652a92054adbbc9514c63384c0f015d2
Author: Haibo Chen <ha...@apache.org>
AuthorDate: Tue Oct 16 14:12:02 2018 -0700

    YARN-8842. Expose metrics for custom resource types in QueueMetrics. (Contributed by Szilard Nemeth)
    
    (cherry picked from commit 84e22a6af46db2859d7d2caf192861cae9b6a1a8)
---
 .../resourcetypes/ResourceTypesTestHelper.java     |  22 +
 .../resourcemanager/scheduler/QueueMetrics.java    | 129 ++++-
 .../scheduler/QueueMetricsForCustomResources.java  | 158 +++++
 .../scheduler/capacity/CapacityScheduler.java      |   5 +-
 .../resourcemanager/scheduler/QueueInfo.java       |  90 +++
 .../scheduler/QueueMetricsTestData.java            | 105 ++++
 .../scheduler/ResourceMetricsChecker.java          |  88 +--
 .../scheduler/TestQueueMetrics.java                | 250 ++++----
 .../TestQueueMetricsForCustomResources.java        | 645 +++++++++++++++++++++
 9 files changed, 1324 insertions(+), 168 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java
index 98a8a00..3c3c2cc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java
@@ -16,6 +16,7 @@
 
 package org.apache.hadoop.yarn.resourcetypes;
 
+import com.google.common.collect.Maps;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -24,6 +25,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * Contains helper methods to create Resource and ResourceInformation objects.
@@ -90,4 +92,24 @@ public final class ResourceTypesTestHelper {
     return new ResourceValueAndUnit(value, matcher.group(2));
   }
 
+  public static Map<String, Long> extractCustomResources(Resource res) {
+    Map<String, Long> customResources = Maps.newHashMap();
+    for (int i = 0; i < res.getResources().length; i++) {
+      ResourceInformation ri = res.getResourceInformation(i);
+      if (!ri.getName().equals(ResourceInformation.MEMORY_URI)
+          && !ri.getName().equals(ResourceInformation.VCORES_URI)) {
+        customResources.put(ri.getName(), ri.getValue());
+      }
+    }
+    return customResources;
+  }
+
+  public static Map<String, String> extractCustomResourcesAsStrings(
+      Resource res) {
+    Map<String, Long> resValues = extractCustomResources(res);
+    return resValues.entrySet().stream()
+        .collect(Collectors.toMap(
+            Map.Entry::getKey, e -> String.valueOf(e.getValue())));
+  }
+
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
index 4b70502..bb8d1d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
@@ -46,7 +46,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .QueueMetricsForCustomResources.QueueMetricsCustomResource;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -119,6 +121,7 @@ public class QueueMetrics implements MetricsSource {
   protected final MetricsSystem metricsSystem;
   protected final Map<String, QueueMetrics> users;
   protected final Configuration conf;
+  private QueueMetricsForCustomResources queueMetricsForCustomResources;
 
   protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, 
 	       boolean enableUserMetrics, Configuration conf) {
@@ -130,6 +133,11 @@ public class QueueMetrics implements MetricsSource {
     metricsSystem = ms;
     this.conf = conf;
     runningTime = buildBuckets(conf);
+
+    if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
+      this.queueMetricsForCustomResources =
+          new QueueMetricsForCustomResources();
+    }
   }
 
   protected QueueMetrics tag(MetricsInfo info, String value) {
@@ -355,9 +363,12 @@ public class QueueMetrics implements MetricsSource {
    * @param limit resource limit
    */
   public void setAvailableResourcesToQueue(String partition, Resource limit) {
-    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+    if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
       availableMB.set(limit.getMemorySize());
       availableVCores.set(limit.getVirtualCores());
+      if (queueMetricsForCustomResources != null) {
+        queueMetricsForCustomResources.setAvailable(limit);
+      }
     }
   }
 
@@ -397,7 +408,7 @@ public class QueueMetrics implements MetricsSource {
    */
   public void incrPendingResources(String partition, String user,
       int containers, Resource res) {
-    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+    if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
       _incrPendingResources(containers, res);
       QueueMetrics userMetrics = getUserMetrics(user);
       if (userMetrics != null) {
@@ -413,12 +424,15 @@ public class QueueMetrics implements MetricsSource {
     pendingContainers.incr(containers);
     pendingMB.incr(res.getMemorySize() * containers);
     pendingVCores.incr(res.getVirtualCores() * containers);
+    if (queueMetricsForCustomResources != null) {
+      queueMetricsForCustomResources.increasePending(res, containers);
+    }
   }
 
 
   public void decrPendingResources(String partition, String user,
       int containers, Resource res) {
-    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+    if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
       _decrPendingResources(containers, res);
       QueueMetrics userMetrics = getUserMetrics(user);
       if (userMetrics != null) {
@@ -434,6 +448,9 @@ public class QueueMetrics implements MetricsSource {
     pendingContainers.decr(containers);
     pendingMB.decr(res.getMemorySize() * containers);
     pendingVCores.decr(res.getVirtualCores() * containers);
+    if (queueMetricsForCustomResources != null) {
+      queueMetricsForCustomResources.decreasePending(res, containers);
+    }
   }
 
   public void incrNodeTypeAggregations(String user, NodeType type) {
@@ -457,12 +474,16 @@ public class QueueMetrics implements MetricsSource {
 
   public void allocateResources(String partition, String user,
       int containers, Resource res, boolean decrPending) {
-    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+    if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
       allocatedContainers.incr(containers);
       aggregateContainersAllocated.incr(containers);
 
       allocatedMB.incr(res.getMemorySize() * containers);
       allocatedVCores.incr(res.getVirtualCores() * containers);
+      if (queueMetricsForCustomResources != null) {
+        queueMetricsForCustomResources.increaseAllocated(res, containers);
+      }
+
       if (decrPending) {
         _decrPendingResources(containers, res);
       }
@@ -484,12 +505,18 @@ public class QueueMetrics implements MetricsSource {
    * @param res
    */
   public void allocateResources(String partition, String user, Resource res) {
-    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+    if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
       allocatedMB.incr(res.getMemorySize());
       allocatedVCores.incr(res.getVirtualCores());
+      if (queueMetricsForCustomResources != null) {
+        queueMetricsForCustomResources.increaseAllocated(res);
+      }
 
       pendingMB.decr(res.getMemorySize());
       pendingVCores.decr(res.getVirtualCores());
+      if (queueMetricsForCustomResources != null) {
+        queueMetricsForCustomResources.decreasePending(res);
+      }
 
       QueueMetrics userMetrics = getUserMetrics(user);
       if (userMetrics != null) {
@@ -503,11 +530,15 @@ public class QueueMetrics implements MetricsSource {
 
   public void releaseResources(String partition,
       String user, int containers, Resource res) {
-    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+    if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
       allocatedContainers.decr(containers);
       aggregateContainersReleased.incr(containers);
       allocatedMB.decr(res.getMemorySize() * containers);
       allocatedVCores.decr(res.getVirtualCores() * containers);
+      if (queueMetricsForCustomResources != null) {
+        queueMetricsForCustomResources.decreaseAllocated(res, containers);
+      }
+
       QueueMetrics userMetrics = getUserMetrics(user);
       if (userMetrics != null) {
         userMetrics.releaseResources(partition, user, containers, res);
@@ -524,9 +555,13 @@ public class QueueMetrics implements MetricsSource {
    * @param user
    * @param res
    */
-  public void releaseResources(String user, Resource res) {
+  private void releaseResources(String user, Resource res) {
     allocatedMB.decr(res.getMemorySize());
     allocatedVCores.decr(res.getVirtualCores());
+    if (queueMetricsForCustomResources != null) {
+      queueMetricsForCustomResources.decreaseAllocated(res);
+    }
+
     QueueMetrics userMetrics = getUserMetrics(user);
     if (userMetrics != null) {
       userMetrics.releaseResources(user, res);
@@ -557,6 +592,17 @@ public class QueueMetrics implements MetricsSource {
     }
   }
 
+  public void updatePreemptedSecondsForCustomResources(Resource res,
+          long seconds) {
+    if (queueMetricsForCustomResources != null) {
+      queueMetricsForCustomResources
+          .increaseAggregatedPreemptedSeconds(res, seconds);
+    }
+    if (parent != null) {
+      parent.updatePreemptedSecondsForCustomResources(res, seconds);
+    }
+  }
+
   public void updatePreemptedResources(Resource res) {
     aggregateMemoryMBPreempted.incr(res.getMemorySize());
     aggregateVcoresPreempted.incr(res.getVirtualCores());
@@ -566,7 +612,7 @@ public class QueueMetrics implements MetricsSource {
   }
 
   public void reserveResource(String partition, String user, Resource res) {
-    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+    if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
       reserveResource(user, res);
     }
   }
@@ -575,6 +621,9 @@ public class QueueMetrics implements MetricsSource {
     reservedContainers.incr();
     reservedMB.incr(res.getMemorySize());
     reservedVCores.incr(res.getVirtualCores());
+    if (queueMetricsForCustomResources != null) {
+      queueMetricsForCustomResources.increaseReserved(res);
+    }
     QueueMetrics userMetrics = getUserMetrics(user);
     if (userMetrics != null) {
       userMetrics.reserveResource(user, res);
@@ -584,10 +633,13 @@ public class QueueMetrics implements MetricsSource {
     }
   }
 
-  public void unreserveResource(String user, Resource res) {
+  private void unreserveResource(String user, Resource res) {
     reservedContainers.decr();
     reservedMB.decr(res.getMemorySize());
     reservedVCores.decr(res.getVirtualCores());
+    if (queueMetricsForCustomResources != null) {
+      queueMetricsForCustomResources.decreaseReserved(res);
+    }
     QueueMetrics userMetrics = getUserMetrics(user);
     if (userMetrics != null) {
       userMetrics.unreserveResource(user, res);
@@ -598,7 +650,7 @@ public class QueueMetrics implements MetricsSource {
   }
 
   public void unreserveResource(String partition, String user, Resource res) {
-    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+    if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
       unreserveResource(user, res);
     }
   }
@@ -660,10 +712,59 @@ public class QueueMetrics implements MetricsSource {
   public int getAppsFailed() {
     return appsFailed.value();
   }
-  
+
   public Resource getAllocatedResources() {
-    return BuilderUtils.newResource(allocatedMB.value(),
-        (int) allocatedVCores.value());
+    if (queueMetricsForCustomResources != null) {
+      return Resource.newInstance(allocatedMB.value(), allocatedVCores.value(),
+              queueMetricsForCustomResources.getAllocatedValues());
+    }
+    return Resource.newInstance(allocatedMB.value(),
+            allocatedVCores.value());
+  }
+
+  public Resource getAvailableResources() {
+    if (queueMetricsForCustomResources != null) {
+      return Resource.newInstance(availableMB.value(), availableVCores.value(),
+          queueMetricsForCustomResources.getAvailableValues());
+    }
+    return Resource.newInstance(availableMB.value(), availableVCores.value());
+  }
+
+  public Resource getPendingResources() {
+    if (queueMetricsForCustomResources != null) {
+      return Resource.newInstance(pendingMB.value(), pendingVCores.value(),
+          queueMetricsForCustomResources.getPendingValues());
+    }
+    return Resource.newInstance(pendingMB.value(), pendingVCores.value());
+  }
+
+  public Resource getReservedResources() {
+    if (queueMetricsForCustomResources != null) {
+      return Resource.newInstance(reservedMB.value(), reservedVCores.value(),
+          queueMetricsForCustomResources.getReservedValues());
+    }
+    return Resource.newInstance(reservedMB.value(), reservedVCores.value());
+  }
+
+  /**
+   * Handle this specially as this has a long value and it could be
+   * truncated when casted into an int parameter of
+   * Resource.newInstance (vCores).
+   * @return QueueMetricsCustomResource
+   */
+  @VisibleForTesting
+  public QueueMetricsCustomResource getAggregatedPreemptedSecondsResources() {
+    return queueMetricsForCustomResources.getAggregatePreemptedSeconds();
+  }
+
+  @VisibleForTesting
+  public MutableCounterLong getAggregateMemoryMBSecondsPreempted() {
+    return aggregateMemoryMBSecondsPreempted;
+  }
+
+  @VisibleForTesting
+  public MutableCounterLong getAggregateVcoreSecondsPreempted() {
+    return aggregateVcoreSecondsPreempted;
   }
 
   public long getAllocatedMB() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java
new file mode 100644
index 0000000..8029584
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * This class is a main entry-point for any kind of metrics for
+ * custom resources.
+ * It provides increase and decrease methods for all types of metrics.
+ */
+public class QueueMetricsForCustomResources {
+  /**
+   * Class that holds metrics values for custom resources in a map keyed with
+   * the name of the custom resource.
+   * There are different kinds of values like allocated, available and others.
+   */
+  public static class QueueMetricsCustomResource {
+    private final Map<String, Long> values = Maps.newHashMap();
+
+    protected void increase(Resource res) {
+      update(res, Long::sum);
+    }
+
+    void increaseWithMultiplier(Resource res, long multiplier) {
+      update(res, (v1, v2) -> v1 + v2 * multiplier);
+    }
+
+    protected void decrease(Resource res) {
+      update(res, (v1, v2) -> v1 - v2);
+    }
+
+    void decreaseWithMultiplier(Resource res, int containers) {
+      update(res, (v1, v2) -> v1 - v2 * containers);
+    }
+
+    protected void set(Resource res) {
+      update(res, (v1, v2) -> v2);
+    }
+
+    private void update(Resource res, BiFunction<Long, Long, Long> operation) {
+      if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
+        ResourceInformation[] resources = res.getResources();
+
+        for (int i = 2; i < resources.length; i++) {
+          ResourceInformation resource = resources[i];
+
+          // Map.merge only applies operation if there is
+          // a value for the key in the map
+          if (!values.containsKey(resource.getName())) {
+            values.put(resource.getName(), 0L);
+          }
+          values.merge(resource.getName(),
+              resource.getValue(), operation);
+        }
+      }
+    }
+
+    public Map<String, Long> getValues() {
+      return values;
+    }
+  }
+  private final QueueMetricsCustomResource aggregatePreemptedSeconds =
+      new QueueMetricsCustomResource();
+  private final QueueMetricsCustomResource allocated =
+      new QueueMetricsCustomResource();
+  private final QueueMetricsCustomResource available =
+      new QueueMetricsCustomResource();
+  private final QueueMetricsCustomResource pending =
+      new QueueMetricsCustomResource();
+
+  private final QueueMetricsCustomResource reserved =
+      new QueueMetricsCustomResource();
+
+  public void increaseReserved(Resource res) {
+    reserved.increase(res);
+  }
+
+  public void decreaseReserved(Resource res) {
+    reserved.decrease(res);
+  }
+
+  public void setAvailable(Resource res) {
+    available.set(res);
+  }
+
+  public void increasePending(Resource res, int containers) {
+    pending.increaseWithMultiplier(res, containers);
+  }
+
+  public void decreasePending(Resource res) {
+    pending.decrease(res);
+  }
+
+  public void decreasePending(Resource res, int containers) {
+    pending.decreaseWithMultiplier(res, containers);
+  }
+
+  public void increaseAllocated(Resource res) {
+    allocated.increase(res);
+  }
+
+  public void increaseAllocated(Resource res, int containers) {
+    allocated.increaseWithMultiplier(res, containers);
+  }
+
+  public void decreaseAllocated(Resource res) {
+    allocated.decrease(res);
+  }
+
+  public void decreaseAllocated(Resource res, int containers) {
+    allocated.decreaseWithMultiplier(res, containers);
+  }
+
+  public void increaseAggregatedPreemptedSeconds(Resource res, long seconds) {
+    aggregatePreemptedSeconds.increaseWithMultiplier(res, seconds);
+  }
+
+  Map<String, Long> getAllocatedValues() {
+    return allocated.getValues();
+  }
+
+  Map<String, Long> getAvailableValues() {
+    return available.getValues();
+  }
+
+  Map<String, Long> getPendingValues() {
+    return pending.getValues();
+  }
+
+  Map<String, Long> getReservedValues() {
+    return reserved.getValues();
+  }
+
+  QueueMetricsCustomResource getAggregatePreemptedSeconds() {
+    return aggregatePreemptedSeconds;
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index f5da987..860b1d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -2023,7 +2023,8 @@ public class CapacityScheduler extends
   private void updateQueuePreemptionMetrics(
       CSQueue queue, RMContainer rmc) {
     QueueMetrics qMetrics = queue.getMetrics();
-    long usedMillis = rmc.getFinishTime() - rmc.getCreationTime();
+    final long usedMillis = rmc.getFinishTime() - rmc.getCreationTime();
+    final long usedSeconds = usedMillis / DateUtils.MILLIS_PER_SECOND;
     Resource containerResource = rmc.getAllocatedResource();
     qMetrics.preemptContainer();
     long mbSeconds = (containerResource.getMemorySize() * usedMillis)
@@ -2032,6 +2033,8 @@ public class CapacityScheduler extends
         / DateUtils.MILLIS_PER_SECOND;
     qMetrics.updatePreemptedMemoryMBSeconds(mbSeconds);
     qMetrics.updatePreemptedVcoreSeconds(vcSeconds);
+    qMetrics.updatePreemptedSecondsForCustomResources(containerResource,
+        usedSeconds);
     qMetrics.updatePreemptedResources(containerResource);
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueInfo.java
new file mode 100644
index 0000000..0a0f893
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueInfo.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+
+import java.util.function.Consumer;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .TestQueueMetrics.userSource;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * This class holds queue and user metrics for a particular queue,
+ * used for testing metrics.
+ * Reference for the parent queue is also stored for every queue,
+ * except if the queue is root.
+ */
+public final class QueueInfo {
+  private final QueueInfo parentQueueInfo;
+  private final Queue queue;
+  final QueueMetrics queueMetrics;
+  final MetricsSource queueSource;
+  final MetricsSource userSource;
+
+  public QueueInfo(QueueInfo parent, String queueName, MetricsSystem ms,
+      Configuration conf, String user) {
+    Queue parentQueue = parent == null ? null : parent.queue;
+    parentQueueInfo = parent;
+    queueMetrics =
+        QueueMetrics.forQueue(ms, queueName, parentQueue, true, conf);
+    queue = mock(Queue.class);
+    when(queue.getMetrics()).thenReturn(queueMetrics);
+    queueSource = ms.getSource(QueueMetrics.sourceName(queueName).toString());
+
+    // need to call getUserMetrics so that a non-null userSource is returned
+    // with the call to userSource(..)
+    queueMetrics.getUserMetrics(user);
+    userSource = userSource(ms, queueName, user);
+  }
+
+  public QueueInfo getRoot() {
+    QueueInfo root = this;
+    while (root.parentQueueInfo != null) {
+      root = root.parentQueueInfo;
+    }
+    return root;
+  }
+
+  public void checkAllQueueSources(Consumer<MetricsSource> consumer) {
+    checkQueueSourcesRecursive(this, consumer);
+  }
+
+  private void checkQueueSourcesRecursive(QueueInfo queueInfo,
+      Consumer<MetricsSource> consumer) {
+    consumer.accept(queueInfo.queueSource);
+    if (queueInfo.parentQueueInfo != null) {
+      checkQueueSourcesRecursive(queueInfo.parentQueueInfo, consumer);
+    }
+  }
+
+  public void checkAllQueueMetrics(Consumer<QueueMetrics> consumer) {
+    checkAllQueueMetricsRecursive(this, consumer);
+  }
+
+  private void checkAllQueueMetricsRecursive(QueueInfo queueInfo, Consumer
+      <QueueMetrics> consumer) {
+    consumer.accept(queueInfo.queueMetrics);
+    if (queueInfo.parentQueueInfo != null) {
+      checkAllQueueMetricsRecursive(queueInfo.parentQueueInfo, consumer);
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsTestData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsTestData.java
new file mode 100644
index 0000000..56df7d3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsTestData.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper
+    .extractCustomResources;
+
+/**
+ * This class is to test standard and custom resource metrics for all types.
+ * Metrics types can be one of: allocated, pending, reserved
+ * and other resources.
+ */
+public final class QueueMetricsTestData {
+  public static final class Builder {
+    private int containers;
+    private Resource resource;
+    private Resource resourceToDecrease;
+    private Map<String, Long> customResourceValues;
+    private int containersToDecrease;
+    private String user;
+    private String partition;
+    private QueueInfo queueInfo;
+
+    private Builder() {
+    }
+
+    public static Builder create() {
+      return new Builder();
+    }
+
+    public Builder withContainers(int containers) {
+      this.containers = containers;
+      return this;
+    }
+
+    public Builder withResourceToDecrease(Resource res, int containers) {
+      this.resourceToDecrease = res;
+      this.containersToDecrease = containers;
+      return this;
+    }
+
+    public Builder withResources(Resource res) {
+      this.resource = res;
+      return this;
+    }
+
+    public Builder withUser(String user) {
+      this.user = user;
+      return this;
+    }
+
+    public Builder withPartition(String partition) {
+      this.partition = partition;
+      return this;
+    }
+
+    public Builder withLeafQueue(QueueInfo qInfo) {
+      this.queueInfo = qInfo;
+      return this;
+    }
+
+    public QueueMetricsTestData build() {
+      this.customResourceValues = extractCustomResources(resource);
+      return new QueueMetricsTestData(this);
+    }
+  }
+
+  final Map<String, Long> customResourceValues;
+  final int containers;
+  final Resource resourceToDecrease;
+  final int containersToDecrease;
+  final Resource resource;
+  final String partition;
+  final QueueInfo leafQueue;
+  final String user;
+
+  private QueueMetricsTestData(Builder builder) {
+    this.customResourceValues = builder.customResourceValues;
+    this.containers = builder.containers;
+    this.resourceToDecrease = builder.resourceToDecrease;
+    this.containersToDecrease = builder.containersToDecrease;
+    this.resource = builder.resource;
+    this.partition = builder.partition;
+    this.leafQueue = builder.queueInfo;
+    this.user = builder.user;
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java
index cd617d7..05341aab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java
@@ -27,34 +27,31 @@ import java.util.Map;
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
-    .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
-    .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
-    .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
-    .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
-    .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
-    .ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
-    .ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
-    .ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricType.COUNTER_LONG;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricType.GAUGE_INT;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricType.GAUGE_LONG;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
-    .ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
-    .ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
 
 final class ResourceMetricsChecker {
   private final static Logger LOG =
           LoggerFactory.getLogger(ResourceMetricsChecker.class);
 
+  enum ResourceMetricType {
+    GAUGE_INT, GAUGE_LONG, COUNTER_INT, COUNTER_LONG
+  }
+
   private static final ResourceMetricsChecker INITIAL_CHECKER =
       new ResourceMetricsChecker()
           .gaugeLong(ALLOCATED_MB, 0)
@@ -72,29 +69,41 @@ final class ResourceMetricsChecker {
           .gaugeInt(RESERVED_CONTAINERS, 0);
 
   enum ResourceMetricsKey {
-    ALLOCATED_MB("AllocatedMB"),
-    ALLOCATED_V_CORES("AllocatedVCores"),
-    ALLOCATED_CONTAINERS("AllocatedContainers"),
-    AGGREGATE_CONTAINERS_ALLOCATED("AggregateContainersAllocated"),
-    AGGREGATE_CONTAINERS_RELEASED("AggregateContainersReleased"),
-    AVAILABLE_MB("AvailableMB"),
-    AVAILABLE_V_CORES("AvailableVCores"),
-    PENDING_MB("PendingMB"),
-    PENDING_V_CORES("PendingVCores"),
-    PENDING_CONTAINERS("PendingContainers"),
-    RESERVED_MB("ReservedMB"),
-    RESERVED_V_CORES("ReservedVCores"),
-    RESERVED_CONTAINERS("ReservedContainers");
+    ALLOCATED_MB("AllocatedMB", GAUGE_LONG),
+    ALLOCATED_V_CORES("AllocatedVCores", GAUGE_INT),
+    ALLOCATED_CONTAINERS("AllocatedContainers", GAUGE_INT),
+    AGGREGATE_CONTAINERS_ALLOCATED("AggregateContainersAllocated",
+        COUNTER_LONG),
+    AGGREGATE_CONTAINERS_RELEASED("AggregateContainersReleased",
+        COUNTER_LONG),
+    AVAILABLE_MB("AvailableMB", GAUGE_LONG),
+    AVAILABLE_V_CORES("AvailableVCores", GAUGE_INT),
+    PENDING_MB("PendingMB", GAUGE_LONG),
+    PENDING_V_CORES("PendingVCores", GAUGE_INT),
+    PENDING_CONTAINERS("PendingContainers", GAUGE_INT),
+    RESERVED_MB("ReservedMB", GAUGE_LONG),
+    RESERVED_V_CORES("ReservedVCores", GAUGE_INT),
+    RESERVED_CONTAINERS("ReservedContainers", GAUGE_INT),
+    AGGREGATE_VCORE_SECONDS_PREEMPTED(
+        "AggregateVcoreSecondsPreempted", COUNTER_LONG),
+    AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED(
+        "AggregateMemoryMBSecondsPreempted", COUNTER_LONG);
 
     private String value;
+    private ResourceMetricType type;
 
-    ResourceMetricsKey(String value) {
+    ResourceMetricsKey(String value, ResourceMetricType type) {
       this.value = value;
+      this.type = type;
     }
 
     public String getValue() {
       return value;
     }
+
+    public ResourceMetricType getType() {
+      return type;
+    }
   }
 
   private final Map<ResourceMetricsKey, Long> gaugesLong;
@@ -123,20 +132,31 @@ final class ResourceMetricsChecker {
   }
 
   ResourceMetricsChecker gaugeLong(ResourceMetricsKey key, long value) {
+    ensureTypeIsCorrect(key, GAUGE_LONG);
     gaugesLong.put(key, value);
     return this;
   }
 
   ResourceMetricsChecker gaugeInt(ResourceMetricsKey key, int value) {
+    ensureTypeIsCorrect(key, GAUGE_INT);
     gaugesInt.put(key, value);
     return this;
   }
 
   ResourceMetricsChecker counter(ResourceMetricsKey key, long value) {
+    ensureTypeIsCorrect(key, COUNTER_LONG);
     counters.put(key, value);
     return this;
   }
 
+  private void ensureTypeIsCorrect(ResourceMetricsKey
+      key, ResourceMetricType actualType) {
+    if (key.type != actualType) {
+      throw new IllegalStateException("Metrics type should be " + key.type
+          + " instead of " + actualType + " for metrics: " + key.value);
+    }
+  }
+
   ResourceMetricsChecker checkAgainst(MetricsSource source) {
     if (source == null) {
       throw new IllegalStateException("MetricsSource should not be null!");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
index c971d65..2066f60 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
@@ -18,15 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
-import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
-import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
-    .AppMetricsChecker.AppMetricsKey.*;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.*;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.MetricsSource;
@@ -46,8 +37,40 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_COMPLETED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_FAILED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_PENDING;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_RUNNING;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_SUBMITTED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class TestQueueMetrics {
+  private static Queue createMockQueue(QueueMetrics metrics) {
+    Queue queue = mock(Queue.class);
+    when(queue.getMetrics()).thenReturn(metrics);
+    return queue;
+  }
+
   private static final int GB = 1024; // MB
+  private static final String USER = "alice";
+  private static final String USER_2 = "dodo";
   private static final Configuration conf = new Configuration();
   private MetricsSystem ms;
 
@@ -60,19 +83,18 @@ public class TestQueueMetrics {
   @Test
   public void testDefaultSingleQueueMetrics() {
     String queueName = "single";
-    String user = "alice";
 
     QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false,
 						 conf);
     MetricsSource queueSource= queueSource(ms, queueName);
-    AppSchedulingInfo app = mockApp(user);
+    AppSchedulingInfo app = mockApp(USER);
 
-    metrics.submitApp(user);
-    MetricsSource userSource = userSource(ms, queueName, user);
+    metrics.submitApp(USER);
+    MetricsSource userSource = userSource(ms, queueName, USER);
     AppMetricsChecker appMetricsChecker = AppMetricsChecker.create()
         .counter(APPS_SUBMITTED, 1)
         .checkAgainst(queueSource, true);
-    metrics.submitAppAttempt(user);
+    metrics.submitAppAttempt(USER);
     appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
         .gaugeInt(APPS_PENDING, 1)
         .checkAgainst(queueSource, true);
@@ -80,7 +102,7 @@ public class TestQueueMetrics {
     metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
         Resources.createResource(100*GB, 100));
     metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
-        user, 5, Resources.createResource(3*GB, 3));
+        USER, 5, Resources.createResource(3*GB, 3));
     // Available resources is set externally, as it depends on dynamic
     // configurable cluster/queue resources
     ResourceMetricsChecker rmChecker = ResourceMetricsChecker.create()
@@ -91,14 +113,14 @@ public class TestQueueMetrics {
         .gaugeInt(PENDING_CONTAINERS, 5)
         .checkAgainst(queueSource);
 
-    metrics.runAppAttempt(app.getApplicationId(), user);
+    metrics.runAppAttempt(app.getApplicationId(), USER);
     appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
         .gaugeInt(APPS_PENDING, 0)
         .gaugeInt(APPS_RUNNING, 1)
         .checkAgainst(queueSource, true);
 
     metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
-        user, 3, Resources.createResource(2*GB, 2), true);
+        USER, 3, Resources.createResource(2*GB, 2), true);
     rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
         .gaugeLong(ALLOCATED_MB, 6 * GB)
         .gaugeInt(ALLOCATED_V_CORES, 6)
@@ -110,7 +132,7 @@ public class TestQueueMetrics {
         .checkAgainst(queueSource);
 
     metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
-        user, 1, Resources.createResource(2*GB, 2));
+        USER, 1, Resources.createResource(2*GB, 2));
     rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
         .gaugeLong(ALLOCATED_MB, 4 * GB)
         .gaugeInt(ALLOCATED_V_CORES, 4)
@@ -119,13 +141,13 @@ public class TestQueueMetrics {
         .checkAgainst(queueSource);
 
     metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
-        user, 0, Resources.createResource(2 * GB, 2));
+        USER, 0, Resources.createResource(2 * GB, 2));
     //nothing should change in values
     rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
         .checkAgainst(queueSource);
 
     metrics.decrPendingResources(RMNodeLabelsManager.NO_LABEL,
-        user, 0, Resources.createResource(2 * GB, 2));
+        USER, 0, Resources.createResource(2 * GB, 2));
     //nothing should change in values
     ResourceMetricsChecker.createFromChecker(rmChecker)
         .checkAgainst(queueSource);
@@ -136,7 +158,7 @@ public class TestQueueMetrics {
         .counter(APPS_SUBMITTED, 1)
         .gaugeInt(APPS_RUNNING, 0)
         .checkAgainst(queueSource, true);
-    metrics.finishApp(user, RMAppState.FINISHED);
+    metrics.finishApp(USER, RMAppState.FINISHED);
     AppMetricsChecker.createFromChecker(appMetricsChecker)
         .counter(APPS_COMPLETED, 1)
         .checkAgainst(queueSource, true);
@@ -146,24 +168,23 @@ public class TestQueueMetrics {
   @Test
   public void testQueueAppMetricsForMultipleFailures() {
     String queueName = "single";
-    String user = "alice";
 
     QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false,
         new Configuration());
     MetricsSource queueSource = queueSource(ms, queueName);
-    AppSchedulingInfo app = mockApp(user);
+    AppSchedulingInfo app = mockApp(USER);
 
-    metrics.submitApp(user);
-    MetricsSource userSource = userSource(ms, queueName, user);
+    metrics.submitApp(USER);
+    MetricsSource userSource = userSource(ms, queueName, USER);
     AppMetricsChecker appMetricsChecker = AppMetricsChecker.create()
         .counter(APPS_SUBMITTED, 1)
         .checkAgainst(queueSource, true);
-    metrics.submitAppAttempt(user);
+    metrics.submitAppAttempt(USER);
     appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
         .gaugeInt(APPS_PENDING, 1)
         .checkAgainst(queueSource, true);
 
-    metrics.runAppAttempt(app.getApplicationId(), user);
+    metrics.runAppAttempt(app.getApplicationId(), USER);
     appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
         .gaugeInt(APPS_PENDING, 0)
         .gaugeInt(APPS_RUNNING, 1)
@@ -177,12 +198,12 @@ public class TestQueueMetrics {
 
     // As the application has failed, framework retries the same application
     // based on configuration
-    metrics.submitAppAttempt(user);
+    metrics.submitAppAttempt(USER);
     appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
         .gaugeInt(APPS_PENDING, 1)
         .checkAgainst(queueSource, true);
 
-    metrics.runAppAttempt(app.getApplicationId(), user);
+    metrics.runAppAttempt(app.getApplicationId(), USER);
     appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
         .gaugeInt(APPS_PENDING, 0)
         .gaugeInt(APPS_RUNNING, 1)
@@ -197,12 +218,12 @@ public class TestQueueMetrics {
 
     // As the application has failed, framework retries the same application
     // based on configuration
-    metrics.submitAppAttempt(user);
+    metrics.submitAppAttempt(USER);
     appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
         .gaugeInt(APPS_PENDING, 1)
         .checkAgainst(queueSource, true);
 
-    metrics.runAppAttempt(app.getApplicationId(), user);
+    metrics.runAppAttempt(app.getApplicationId(), USER);
     appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
         .gaugeInt(APPS_PENDING, 0)
         .gaugeInt(APPS_RUNNING, 1)
@@ -215,7 +236,7 @@ public class TestQueueMetrics {
         .gaugeInt(APPS_RUNNING, 0)
         .checkAgainst(queueSource, true);
 
-    metrics.finishApp(user, RMAppState.FAILED);
+    metrics.finishApp(USER, RMAppState.FAILED);
     AppMetricsChecker.createFromChecker(appMetricsChecker)
         .gaugeInt(APPS_RUNNING, 0)
         .counter(APPS_FAILED, 1)
@@ -227,15 +248,14 @@ public class TestQueueMetrics {
   @Test
   public void testSingleQueueWithUserMetrics() {
     String queueName = "single2";
-    String user = "dodo";
 
     QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, true,
 						 conf);
     MetricsSource queueSource = queueSource(ms, queueName);
-    AppSchedulingInfo app = mockApp(user);
+    AppSchedulingInfo app = mockApp(USER_2);
 
-    metrics.submitApp(user);
-    MetricsSource userSource = userSource(ms, queueName, user);
+    metrics.submitApp(USER_2);
+    MetricsSource userSource = userSource(ms, queueName, USER_2);
 
     AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create()
         .counter(APPS_SUBMITTED, 1)
@@ -244,7 +264,7 @@ public class TestQueueMetrics {
         .counter(APPS_SUBMITTED, 1)
         .checkAgainst(userSource, true);
 
-    metrics.submitAppAttempt(user);
+    metrics.submitAppAttempt(USER_2);
     appMetricsQueueSourceChecker = AppMetricsChecker
         .createFromChecker(appMetricsQueueSourceChecker)
         .gaugeInt(APPS_PENDING, 1)
@@ -257,9 +277,9 @@ public class TestQueueMetrics {
     metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
         Resources.createResource(100*GB, 100));
     metrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL,
-        user, Resources.createResource(10*GB, 10));
+        USER_2, Resources.createResource(10*GB, 10));
     metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
-        user, 5, Resources.createResource(3*GB, 3));
+        USER_2, 5, Resources.createResource(3*GB, 3));
 
     // Available resources is set externally, as it depends on dynamic
     // configurable cluster/queue resources
@@ -280,7 +300,7 @@ public class TestQueueMetrics {
             .gaugeInt(PENDING_CONTAINERS, 5)
             .checkAgainst(userSource);
 
-    metrics.runAppAttempt(app.getApplicationId(), user);
+    metrics.runAppAttempt(app.getApplicationId(), USER_2);
     appMetricsQueueSourceChecker = AppMetricsChecker
         .createFromChecker(appMetricsQueueSourceChecker)
             .gaugeInt(APPS_PENDING, 0)
@@ -293,7 +313,7 @@ public class TestQueueMetrics {
             .checkAgainst(userSource, true);
 
     metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
-        user, 3, Resources.createResource(2*GB, 2), true);
+        USER_2, 3, Resources.createResource(2*GB, 2), true);
     resMetricsQueueSourceChecker =
         ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
             .gaugeLong(ALLOCATED_MB, 6 * GB)
@@ -316,7 +336,7 @@ public class TestQueueMetrics {
             .checkAgainst(userSource);
 
     metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
-        user, 1, Resources.createResource(2*GB, 2));
+        USER_2, 1, Resources.createResource(2*GB, 2));
     ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
             .gaugeLong(ALLOCATED_MB, 4 * GB)
             .gaugeInt(ALLOCATED_V_CORES, 4)
@@ -340,7 +360,7 @@ public class TestQueueMetrics {
         AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
             .gaugeInt(APPS_RUNNING, 0)
             .checkAgainst(userSource, true);
-    metrics.finishApp(user, RMAppState.FINISHED);
+    metrics.finishApp(USER_2, RMAppState.FINISHED);
     AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
         .counter(APPS_COMPLETED, 1)
         .checkAgainst(queueSource, true);
@@ -353,7 +373,6 @@ public class TestQueueMetrics {
   public void testNodeTypeMetrics() {
     String parentQueueName = "root";
     String leafQueueName = "root.leaf";
-    String user = "alice";
 
     QueueMetrics parentMetrics =
       QueueMetrics.forQueue(ms, parentQueueName, null, true, conf);
@@ -365,29 +384,29 @@ public class TestQueueMetrics {
     MetricsSource queueSource = queueSource(ms, leafQueueName);
     //AppSchedulingInfo app = mockApp(user);
 
-    metrics.submitApp(user);
-    MetricsSource userSource = userSource(ms, leafQueueName, user);
-    MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
+    metrics.submitApp(USER);
+    MetricsSource userSource = userSource(ms, leafQueueName, USER);
+    MetricsSource parentUserSource = userSource(ms, parentQueueName, USER);
 
-    metrics.incrNodeTypeAggregations(user, NodeType.NODE_LOCAL);
+    metrics.incrNodeTypeAggregations(USER, NodeType.NODE_LOCAL);
     checkAggregatedNodeTypes(queueSource, 1L, 0L, 0L);
     checkAggregatedNodeTypes(parentQueueSource, 1L, 0L, 0L);
     checkAggregatedNodeTypes(userSource, 1L, 0L, 0L);
     checkAggregatedNodeTypes(parentUserSource, 1L, 0L, 0L);
 
-    metrics.incrNodeTypeAggregations(user, NodeType.RACK_LOCAL);
+    metrics.incrNodeTypeAggregations(USER, NodeType.RACK_LOCAL);
     checkAggregatedNodeTypes(queueSource, 1L, 1L, 0L);
     checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 0L);
     checkAggregatedNodeTypes(userSource, 1L, 1L, 0L);
     checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 0L);
 
-    metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH);
+    metrics.incrNodeTypeAggregations(USER, NodeType.OFF_SWITCH);
     checkAggregatedNodeTypes(queueSource, 1L, 1L, 1L);
     checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 1L);
     checkAggregatedNodeTypes(userSource, 1L, 1L, 1L);
     checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 1L);
 
-    metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH);
+    metrics.incrNodeTypeAggregations(USER, NodeType.OFF_SWITCH);
     checkAggregatedNodeTypes(queueSource, 1L, 1L, 2L);
     checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 2L);
     checkAggregatedNodeTypes(userSource, 1L, 1L, 2L);
@@ -396,67 +415,60 @@ public class TestQueueMetrics {
 
   @Test
   public void testTwoLevelWithUserMetrics() {
-    String parentQueueName = "root";
-    String leafQueueName = "root.leaf";
-    String user = "alice";
+    AppSchedulingInfo app = mockApp(USER);
 
-    QueueMetrics parentMetrics =
-      QueueMetrics.forQueue(ms, parentQueueName, null, true, conf);
-    Queue parentQueue = mock(Queue.class);
-    when(parentQueue.getMetrics()).thenReturn(parentMetrics);
-    QueueMetrics metrics =
-      QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, conf);
-    MetricsSource parentQueueSource = queueSource(ms, parentQueueName);
-    MetricsSource queueSource = queueSource(ms, leafQueueName);
-    AppSchedulingInfo app = mockApp(user);
-
-    metrics.submitApp(user);
-    MetricsSource userSource = userSource(ms, leafQueueName, user);
-    MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
+    QueueInfo root = new QueueInfo(null, "root", ms, conf, USER);
+    QueueInfo leaf = new QueueInfo(root, "root.leaf", ms, conf, USER);
+    leaf.queueMetrics.submitApp(USER);
 
     AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create()
         .counter(APPS_SUBMITTED, 1)
-        .checkAgainst(queueSource, true);
+        .checkAgainst(leaf.queueSource, true);
     AppMetricsChecker appMetricsParentQueueSourceChecker =
         AppMetricsChecker.create()
         .counter(APPS_SUBMITTED, 1)
-        .checkAgainst(parentQueueSource, true);
+        .checkAgainst(root.queueSource, true);
     AppMetricsChecker appMetricsUserSourceChecker = AppMetricsChecker.create()
         .counter(APPS_SUBMITTED, 1)
-        .checkAgainst(userSource, true);
+        .checkAgainst(leaf.userSource, true);
     AppMetricsChecker appMetricsParentUserSourceChecker =
         AppMetricsChecker.create()
         .counter(APPS_SUBMITTED, 1)
-        .checkAgainst(parentUserSource, true);
+        .checkAgainst(root.userSource, true);
 
-    metrics.submitAppAttempt(user);
+    leaf.queueMetrics.submitAppAttempt(USER);
     appMetricsQueueSourceChecker =
         AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
         .gaugeInt(APPS_PENDING, 1)
-        .checkAgainst(queueSource, true);
+        .checkAgainst(leaf.queueSource, true);
     appMetricsParentQueueSourceChecker =
         AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker)
         .gaugeInt(APPS_PENDING, 1)
-        .checkAgainst(parentQueueSource, true);
+        .checkAgainst(root.queueSource, true);
     appMetricsUserSourceChecker =
         AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
         .gaugeInt(APPS_PENDING, 1)
-        .checkAgainst(userSource, true);
+        .checkAgainst(leaf.userSource, true);
     appMetricsParentUserSourceChecker =
         AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker)
         .gaugeInt(APPS_PENDING, 1)
-        .checkAgainst(parentUserSource, true);
+        .checkAgainst(root.userSource, true);
 
-    parentMetrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
+    root.queueMetrics.setAvailableResourcesToQueue(
+        RMNodeLabelsManager.NO_LABEL,
         Resources.createResource(100*GB, 100));
-    metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
+    leaf.queueMetrics.setAvailableResourcesToQueue(
+        RMNodeLabelsManager.NO_LABEL,
         Resources.createResource(100*GB, 100));
-    parentMetrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL,
-        user, Resources.createResource(10*GB, 10));
-    metrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL,
-        user, Resources.createResource(10*GB, 10));
-    metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
-        user, 5, Resources.createResource(3*GB, 3));
+    root.queueMetrics.setAvailableResourcesToUser(
+        RMNodeLabelsManager.NO_LABEL,
+        USER, Resources.createResource(10*GB, 10));
+    leaf.queueMetrics.setAvailableResourcesToUser(
+        RMNodeLabelsManager.NO_LABEL,
+        USER, Resources.createResource(10*GB, 10));
+    leaf.queueMetrics.incrPendingResources(
+        RMNodeLabelsManager.NO_LABEL,
+        USER, 5, Resources.createResource(3*GB, 3));
 
     ResourceMetricsChecker resMetricsQueueSourceChecker =
         ResourceMetricsChecker.create()
@@ -465,7 +477,7 @@ public class TestQueueMetrics {
         .gaugeLong(PENDING_MB, 15 * GB)
         .gaugeInt(PENDING_V_CORES, 15)
         .gaugeInt(PENDING_CONTAINERS, 5)
-        .checkAgainst(queueSource);
+        .checkAgainst(leaf.queueSource);
     ResourceMetricsChecker resMetricsParentQueueSourceChecker =
         ResourceMetricsChecker.create()
             .gaugeLong(AVAILABLE_MB, 100 * GB)
@@ -473,7 +485,7 @@ public class TestQueueMetrics {
             .gaugeLong(PENDING_MB, 15 * GB)
             .gaugeInt(PENDING_V_CORES, 15)
             .gaugeInt(PENDING_CONTAINERS, 5)
-            .checkAgainst(parentQueueSource);
+            .checkAgainst(root.queueSource);
     ResourceMetricsChecker resMetricsUserSourceChecker =
         ResourceMetricsChecker.create()
             .gaugeLong(AVAILABLE_MB, 10 * GB)
@@ -481,7 +493,7 @@ public class TestQueueMetrics {
             .gaugeLong(PENDING_MB, 15 * GB)
             .gaugeInt(PENDING_V_CORES, 15)
             .gaugeInt(PENDING_CONTAINERS, 5)
-            .checkAgainst(userSource);
+            .checkAgainst(leaf.userSource);
     ResourceMetricsChecker resMetricsParentUserSourceChecker =
         ResourceMetricsChecker.create()
             .gaugeLong(AVAILABLE_MB, 10 * GB)
@@ -489,24 +501,24 @@ public class TestQueueMetrics {
             .gaugeLong(PENDING_MB, 15 * GB)
             .gaugeInt(PENDING_V_CORES, 15)
             .gaugeInt(PENDING_CONTAINERS, 5)
-            .checkAgainst(parentUserSource);
+            .checkAgainst(root.userSource);
 
-    metrics.runAppAttempt(app.getApplicationId(), user);
+    leaf.queueMetrics.runAppAttempt(app.getApplicationId(), USER);
     appMetricsQueueSourceChecker =
         AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
             .gaugeInt(APPS_PENDING, 0)
             .gaugeInt(APPS_RUNNING, 1)
-            .checkAgainst(queueSource, true);
+            .checkAgainst(leaf.queueSource, true);
     appMetricsUserSourceChecker =
         AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
             .gaugeInt(APPS_PENDING, 0)
             .gaugeInt(APPS_RUNNING, 1)
-            .checkAgainst(userSource, true);
+            .checkAgainst(leaf.userSource, true);
 
-    metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
-        user, 3, Resources.createResource(2*GB, 2), true);
-    metrics.reserveResource(RMNodeLabelsManager.NO_LABEL,
-        user, Resources.createResource(3*GB, 3));
+    leaf.queueMetrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
+        USER, 3, Resources.createResource(2*GB, 2), true);
+    leaf.queueMetrics.reserveResource(RMNodeLabelsManager.NO_LABEL,
+        USER, Resources.createResource(3*GB, 3));
     // Available resources is set externally, as it depends on dynamic
     // configurable cluster/queue resources
     resMetricsQueueSourceChecker =
@@ -521,7 +533,7 @@ public class TestQueueMetrics {
             .gaugeLong(RESERVED_MB, 3 * GB)
             .gaugeInt(RESERVED_V_CORES, 3)
             .gaugeInt(RESERVED_CONTAINERS, 1)
-            .checkAgainst(queueSource);
+            .checkAgainst(leaf.queueSource);
     resMetricsParentQueueSourceChecker =
         ResourceMetricsChecker
             .createFromChecker(resMetricsParentQueueSourceChecker)
@@ -535,7 +547,7 @@ public class TestQueueMetrics {
             .gaugeLong(RESERVED_MB, 3 * GB)
             .gaugeInt(RESERVED_V_CORES, 3)
             .gaugeInt(RESERVED_CONTAINERS, 1)
-            .checkAgainst(parentQueueSource);
+            .checkAgainst(root.queueSource);
     resMetricsUserSourceChecker =
         ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
             .gaugeLong(ALLOCATED_MB, 6 * GB)
@@ -548,7 +560,7 @@ public class TestQueueMetrics {
             .gaugeLong(RESERVED_MB, 3 * GB)
             .gaugeInt(RESERVED_V_CORES, 3)
             .gaugeInt(RESERVED_CONTAINERS, 1)
-        .checkAgainst(userSource);
+        .checkAgainst(leaf.userSource);
     resMetricsParentUserSourceChecker = ResourceMetricsChecker
         .createFromChecker(resMetricsParentUserSourceChecker)
             .gaugeLong(ALLOCATED_MB, 6 * GB)
@@ -561,12 +573,12 @@ public class TestQueueMetrics {
             .gaugeLong(RESERVED_MB, 3 * GB)
             .gaugeInt(RESERVED_V_CORES, 3)
             .gaugeInt(RESERVED_CONTAINERS, 1)
-            .checkAgainst(parentUserSource);
+            .checkAgainst(root.userSource);
 
-    metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
-        user, 1, Resources.createResource(2*GB, 2));
-    metrics.unreserveResource(RMNodeLabelsManager.NO_LABEL,
-          user, Resources.createResource(3*GB, 3));
+    leaf.queueMetrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
+        USER, 1, Resources.createResource(2*GB, 2));
+    leaf.queueMetrics.unreserveResource(RMNodeLabelsManager.NO_LABEL,
+        USER, Resources.createResource(3*GB, 3));
     ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
         .gaugeLong(ALLOCATED_MB, 4 * GB)
         .gaugeInt(ALLOCATED_V_CORES, 4)
@@ -575,7 +587,7 @@ public class TestQueueMetrics {
         .gaugeLong(RESERVED_MB, 0)
         .gaugeInt(RESERVED_V_CORES, 0)
         .gaugeInt(RESERVED_CONTAINERS, 0)
-        .checkAgainst(queueSource);
+        .checkAgainst(leaf.queueSource);
     ResourceMetricsChecker.createFromChecker(resMetricsParentQueueSourceChecker)
         .gaugeLong(ALLOCATED_MB, 4 * GB)
         .gaugeInt(ALLOCATED_V_CORES, 4)
@@ -584,7 +596,7 @@ public class TestQueueMetrics {
         .gaugeLong(RESERVED_MB, 0)
         .gaugeInt(RESERVED_V_CORES, 0)
         .gaugeInt(RESERVED_CONTAINERS, 0)
-        .checkAgainst(parentQueueSource);
+        .checkAgainst(root.queueSource);
     ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
         .gaugeLong(ALLOCATED_MB, 4 * GB)
         .gaugeInt(ALLOCATED_V_CORES, 4)
@@ -593,7 +605,7 @@ public class TestQueueMetrics {
         .gaugeLong(RESERVED_MB, 0)
         .gaugeInt(RESERVED_V_CORES, 0)
         .gaugeInt(RESERVED_CONTAINERS, 0)
-        .checkAgainst(userSource);
+        .checkAgainst(leaf.userSource);
     ResourceMetricsChecker.createFromChecker(resMetricsParentUserSourceChecker)
         .gaugeLong(ALLOCATED_MB, 4 * GB)
         .gaugeInt(ALLOCATED_V_CORES, 4)
@@ -602,46 +614,46 @@ public class TestQueueMetrics {
         .gaugeLong(RESERVED_MB, 0)
         .gaugeInt(RESERVED_V_CORES, 0)
         .gaugeInt(RESERVED_CONTAINERS, 0)
-        .checkAgainst(parentUserSource);
+        .checkAgainst(root.userSource);
 
-    metrics.finishAppAttempt(
+    leaf.queueMetrics.finishAppAttempt(
         app.getApplicationId(), app.isPending(), app.getUser());
     appMetricsQueueSourceChecker = AppMetricsChecker
         .createFromChecker(appMetricsQueueSourceChecker)
             .counter(APPS_SUBMITTED, 1)
             .gaugeInt(APPS_RUNNING, 0)
-            .checkAgainst(queueSource, true);
+            .checkAgainst(leaf.queueSource, true);
     appMetricsParentQueueSourceChecker = AppMetricsChecker
             .createFromChecker(appMetricsParentQueueSourceChecker)
             .counter(APPS_SUBMITTED, 1)
             .gaugeInt(APPS_PENDING, 0)
             .gaugeInt(APPS_RUNNING, 0)
-            .checkAgainst(parentQueueSource, true);
+            .checkAgainst(root.queueSource, true);
     appMetricsUserSourceChecker = AppMetricsChecker
             .createFromChecker(appMetricsUserSourceChecker)
             .counter(APPS_SUBMITTED, 1)
             .gaugeInt(APPS_RUNNING, 0)
-            .checkAgainst(userSource, true);
+            .checkAgainst(leaf.userSource, true);
     appMetricsParentUserSourceChecker = AppMetricsChecker
             .createFromChecker(appMetricsParentUserSourceChecker)
             .counter(APPS_SUBMITTED, 1)
             .gaugeInt(APPS_PENDING, 0)
             .gaugeInt(APPS_RUNNING, 0)
-            .checkAgainst(parentUserSource, true);
+            .checkAgainst(root.userSource, true);
 
-    metrics.finishApp(user, RMAppState.FINISHED);
+    leaf.queueMetrics.finishApp(USER, RMAppState.FINISHED);
     AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
         .counter(APPS_COMPLETED, 1)
-        .checkAgainst(queueSource, true);
+        .checkAgainst(leaf.queueSource, true);
     AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker)
         .counter(APPS_COMPLETED, 1)
-        .checkAgainst(parentQueueSource, true);
+        .checkAgainst(root.queueSource, true);
     AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
         .counter(APPS_COMPLETED, 1)
-        .checkAgainst(userSource, true);
+        .checkAgainst(leaf.userSource, true);
     AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker)
         .counter(APPS_COMPLETED, 1)
-        .checkAgainst(parentUserSource, true);
+        .checkAgainst(root.userSource, true);
   }
   
   @Test 
@@ -719,7 +731,7 @@ public class TestQueueMetrics {
     assertCounter("AggregateOffSwitchContainersAllocated", offSwitch, rb);
   }
 
-  private static AppSchedulingInfo mockApp(String user) {
+  static AppSchedulingInfo mockApp(String user) {
     AppSchedulingInfo app = mock(AppSchedulingInfo.class);
     when(app.getUser()).thenReturn(user);
     ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
@@ -732,7 +744,7 @@ public class TestQueueMetrics {
     return ms.getSource(QueueMetrics.sourceName(queue).toString());
   }
 
-  private static MetricsSource userSource(MetricsSystem ms, String queue,
+  public static MetricsSource userSource(MetricsSystem ms, String queue,
       String user) {
     return ms.getSource(QueueMetrics.sourceName(queue).
         append(",user=").append(user).toString());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java
new file mode 100644
index 0000000..76a9849
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java
@@ -0,0 +1,645 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .QueueMetricsForCustomResources.QueueMetricsCustomResource;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES;
+import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper
+    .extractCustomResourcesAsStrings;
+import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper.newResource;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_VCORE_SECONDS_PREEMPTED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics.queueSource;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestQueueMetricsForCustomResources {
+  public enum MetricsForCustomResource {
+    ALLOCATED, AVAILABLE, PENDING, RESERVED, AGGREGATE_PREEMPTED_SECONDS
+  }
+
+  public static final long GB = 1024; // MB
+  private static final Configuration CONF = new Configuration();
+  private static final String CUSTOM_RES_1 = "custom_res_1";
+  private static final String CUSTOM_RES_2 = "custom_res_2";
+  public static final String USER = "alice";
+  private Resource defaultResource;
+  private MetricsSystem ms;
+
+  @Before
+  public void setUp() {
+    ms = new MetricsSystemImpl();
+    QueueMetrics.clearQueueMetrics();
+    initializeResourceTypes();
+    createDefaultResource();
+  }
+
+  private void createDefaultResource() {
+    defaultResource = newResource(4 * GB, 4,
+        ImmutableMap.<String, String> builder()
+            .put(CUSTOM_RES_1, String.valueOf(15 * GB))
+            .put(CUSTOM_RES_2, String.valueOf(20 * GB))
+            .build());
+  }
+
+  private void initializeResourceTypes() {
+    Map<String, ResourceInformation> riMap = new HashMap<>();
+
+    ResourceInformation memory = ResourceInformation.newInstance(
+        ResourceInformation.MEMORY_MB.getName(),
+        ResourceInformation.MEMORY_MB.getUnits(),
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+    ResourceInformation vcores = ResourceInformation.newInstance(
+        ResourceInformation.VCORES.getName(),
+        ResourceInformation.VCORES.getUnits(),
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+        DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+    ResourceInformation res1 = ResourceInformation.newInstance(CUSTOM_RES_1,
+        ResourceInformation.VCORES.getUnits(), 0, 2000);
+    ResourceInformation res2 = ResourceInformation.newInstance(CUSTOM_RES_2,
+        ResourceInformation.VCORES.getUnits(), 0, 2000);
+
+    riMap.put(ResourceInformation.MEMORY_URI, memory);
+    riMap.put(ResourceInformation.VCORES_URI, vcores);
+    riMap.put(CUSTOM_RES_1, res1);
+    riMap.put(CUSTOM_RES_2, res2);
+    ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+  }
+
+  private static void assertCustomResourceValue(QueueMetrics metrics,
+      MetricsForCustomResource metricsType,
+      Function<QueueMetrics, Resource> func,
+      String resourceName,
+      long expectedValue) {
+    Resource res = func.apply(metrics);
+    Long value = res.getResourceValue(resourceName);
+    assertCustomResourceValueInternal(metricsType, resourceName,
+        expectedValue, value);
+  }
+
+  private static void assertCustomResourceValueInternal(
+      MetricsForCustomResource metricsType, String resourceName, long
+      expectedValue, Long value) {
+    assertNotNull(
+            "QueueMetrics should have custom resource metrics value " +
+                "for resource: " + resourceName, value);
+    assertEquals(String.format(
+            "QueueMetrics should have custom resource metrics value %d " +
+                "for resource: %s for metrics type %s",
+            expectedValue, resourceName, metricsType), expectedValue,
+            (long) value);
+  }
+
+  private static Map<String, String> getCustomResourcesWithValue(long value) {
+    return ImmutableMap.<String, String>builder()
+        .put(CUSTOM_RES_1, String.valueOf(value))
+        .put(CUSTOM_RES_2, String.valueOf(value))
+        .build();
+  }
+
+  private QueueInfo createFourLevelQueueHierarchy() {
+    QueueInfo root = new QueueInfo(null, "root", ms, CONF, USER);
+    QueueInfo sub = new QueueInfo(root, "root.subQ", ms, CONF, USER);
+    QueueInfo sub2 = new QueueInfo(sub, "root.subQ2", ms, CONF, USER);
+    return new QueueInfo(sub2, "root.subQ2.leafQ", ms, CONF, USER);
+  }
+
+  private QueueInfo createBasicQueueHierarchy() {
+    QueueInfo root = new QueueInfo(null, "root", ms, CONF, USER);
+    return new QueueInfo(root, "root.leaf", ms, CONF, USER);
+  }
+
+  private QueueMetricsTestData.Builder
+      createQueueMetricsTestDataWithContainers(int containers) {
+    return createDefaultQueueMetricsTestData()
+        .withContainers(containers);
+  }
+
+  private QueueMetricsTestData.Builder createDefaultQueueMetricsTestData() {
+    return QueueMetricsTestData.Builder.create()
+        .withUser(USER)
+        .withPartition(RMNodeLabelsManager.NO_LABEL);
+  }
+
+  private void testIncreasePendingResources(QueueMetricsTestData testData) {
+    testIncreasePendingResourcesInternal(testData.containers, testData);
+  }
+
+  private void testIncreasePendingResourcesWithoutContainer(
+      QueueMetricsTestData testData) {
+    testIncreasePendingResourcesInternal(1, testData);
+  }
+
+  private void testIncreasePendingResourcesInternal(int containers,
+      QueueMetricsTestData testData) {
+    testData.leafQueue.queueMetrics.incrPendingResources(testData.partition,
+        testData.user, containers, testData.resource);
+
+    ResourceMetricsChecker checker = ResourceMetricsChecker
+        .create()
+        .gaugeInt(PENDING_CONTAINERS, containers)
+        .gaugeLong(PENDING_MB, containers *
+            testData.resource.getMemorySize())
+        .gaugeInt(PENDING_V_CORES, containers *
+            testData.resource.getVirtualCores());
+    assertAllMetrics(testData.leafQueue, checker,
+        QueueMetrics::getPendingResources,
+        MetricsForCustomResource.PENDING, computeExpectedCustomResourceValues(
+            testData.customResourceValues, (k, v) -> v * containers));
+  }
+
+  private void testAllocateResources(boolean decreasePending,
+      QueueMetricsTestData testData) {
+    testData.leafQueue.queueMetrics.allocateResources(testData.partition,
+        testData.user, testData.containers, testData.resource, decreasePending);
+
+    ResourceMetricsChecker checker = ResourceMetricsChecker
+        .create()
+        .gaugeInt(ALLOCATED_CONTAINERS, testData.containers)
+        .counter(AGGREGATE_CONTAINERS_ALLOCATED, testData.containers)
+        .gaugeLong(ALLOCATED_MB, testData.containers *
+            testData.resource.getMemorySize())
+        .gaugeInt(ALLOCATED_V_CORES, testData.containers *
+            testData.resource.getVirtualCores())
+        .gaugeInt(PENDING_CONTAINERS, 0)
+        .gaugeLong(PENDING_MB, 0)
+        .gaugeInt(PENDING_V_CORES, 0)
+        .checkAgainst(testData.leafQueue.queueSource);
+    if (decreasePending) {
+      assertAllMetrics(testData.leafQueue, checker,
+          QueueMetrics::getPendingResources,
+          MetricsForCustomResource.PENDING,
+          computeExpectedCustomResourceValues(testData.customResourceValues,
+              (k, v) -> 0L));
+    }
+    if (!testData.customResourceValues.isEmpty()) {
+      assertAllMetrics(testData.leafQueue, checker,
+          QueueMetrics::getAllocatedResources,
+          MetricsForCustomResource.ALLOCATED,
+          computeExpectedCustomResourceValues(testData.customResourceValues,
+              (k, v) -> v * testData.containers));
+    }
+  }
+
+  private void testUpdatePreemptedSeconds(QueueMetricsTestData testData,
+      int seconds) {
+    testData.leafQueue.queueMetrics.updatePreemptedMemoryMBSeconds(
+        testData.resource.getMemorySize() * seconds);
+    testData.leafQueue.queueMetrics.updatePreemptedVcoreSeconds(
+        testData.resource.getVirtualCores() * seconds);
+    testData.leafQueue.queueMetrics.updatePreemptedSecondsForCustomResources(
+        testData.resource, seconds);
+
+    ResourceMetricsChecker checker = ResourceMetricsChecker
+        .create()
+        .counter(AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED,
+            testData.resource.getMemorySize() * seconds)
+        .counter(AGGREGATE_VCORE_SECONDS_PREEMPTED,
+            testData.resource.getVirtualCores() * seconds);
+
+    assertQueueMetricsOnly(testData.leafQueue, checker,
+        this::convertPreemptedSecondsToResource,
+        MetricsForCustomResource.AGGREGATE_PREEMPTED_SECONDS,
+        computeExpectedCustomResourceValues(testData.customResourceValues,
+            (k, v) -> v * seconds));
+  }
+
+  private Resource convertPreemptedSecondsToResource(QueueMetrics qm) {
+    QueueMetricsCustomResource customValues = qm
+        .getAggregatedPreemptedSecondsResources();
+    MutableCounterLong vcoreSeconds = qm
+        .getAggregateVcoreSecondsPreempted();
+    MutableCounterLong memorySeconds = qm
+        .getAggregateMemoryMBSecondsPreempted();
+    return Resource.newInstance(
+        memorySeconds.value(), (int) vcoreSeconds.value(),
+        customValues.getValues());
+  }
+
+  private void testReserveResources(QueueMetricsTestData testData) {
+    testData.leafQueue.queueMetrics.reserveResource(testData.partition,
+        testData.user, testData.resource);
+
+    ResourceMetricsChecker checker = ResourceMetricsChecker
+        .create()
+        .gaugeInt(RESERVED_CONTAINERS, 1)
+        .gaugeLong(RESERVED_MB, testData.resource.getMemorySize())
+        .gaugeInt(RESERVED_V_CORES, testData.resource.getVirtualCores())
+        .checkAgainst(testData.leafQueue.queueSource);
+    assertAllMetrics(testData.leafQueue, checker,
+        QueueMetrics::getReservedResources,
+        MetricsForCustomResource.RESERVED,
+        computeExpectedCustomResourceValues(
+            testData.customResourceValues, (k, v) -> v));
+  }
+
+  private void testGetAllocatedResources(QueueMetricsTestData testData) {
+    testAllocateResources(false, testData);
+
+    Resource res = testData.leafQueue.queueMetrics.getAllocatedResources();
+    if (testData.customResourceValues.size() > 0) {
+      assertCustomResourceValueInternal(MetricsForCustomResource.ALLOCATED,
+          CUSTOM_RES_1,
+          testData.customResourceValues.get(CUSTOM_RES_1) * testData.containers,
+          res.getResourceValue(CUSTOM_RES_1));
+      assertCustomResourceValueInternal(MetricsForCustomResource.ALLOCATED,
+          CUSTOM_RES_2,
+          testData.customResourceValues.get(CUSTOM_RES_2) * testData.containers,
+          res.getResourceValue(CUSTOM_RES_2));
+    }
+  }
+
+  private void assertAllMetrics(QueueInfo queueInfo,
+      ResourceMetricsChecker checker,
+      Function<QueueMetrics, Resource> func,
+      MetricsForCustomResource metricsType,
+      Map<String, Long> expectedCustomResourceValues) {
+    assertAllQueueMetrics(queueInfo, checker, func, metricsType,
+        expectedCustomResourceValues);
+
+    //assert leaf and root userSources
+    checker = ResourceMetricsChecker.createFromChecker(checker)
+        .checkAgainst(queueInfo.userSource);
+    ResourceMetricsChecker.createFromChecker(checker)
+        .checkAgainst(queueInfo.getRoot().userSource);
+  }
+
+  private void assertQueueMetricsOnly(QueueInfo queueInfo,
+      ResourceMetricsChecker checker,
+      Function<QueueMetrics, Resource> func,
+      MetricsForCustomResource metricsType,
+      Map<String, Long> expectedCustomResourceValues) {
+    assertAllQueueMetrics(queueInfo, checker, func, metricsType,
+        expectedCustomResourceValues);
+  }
+
+  private void assertAllQueueMetrics(QueueInfo queueInfo,
+      ResourceMetricsChecker checker,
+      Function<QueueMetrics, Resource> func,
+      MetricsForCustomResource metricsType,
+      Map<String, Long> expectedCustomResourceValues) {
+    // assert normal resource metrics values
+    queueInfo.checkAllQueueSources(qs -> ResourceMetricsChecker
+        .createFromChecker(checker).checkAgainst(qs));
+
+    // assert custom resource metrics values
+    queueInfo.checkAllQueueMetrics(qm -> {
+      assertCustomResourceValue(qm, metricsType, func, CUSTOM_RES_1,
+          expectedCustomResourceValues.get(CUSTOM_RES_1));
+      assertCustomResourceValue(qm, metricsType, func, CUSTOM_RES_2,
+          expectedCustomResourceValues.get(CUSTOM_RES_2));
+    });
+  }
+
+  private Map<String, Long> computeExpectedCustomResourceValues(
+      Map<String, Long> customResourceValues,
+      BiFunction<String, Long, Long> func) {
+    Map<String, Long> values = Maps.newHashMap();
+    for (Map.Entry<String, Long> res : customResourceValues.entrySet()) {
+      values.put(res.getKey(), func.apply(res.getKey(), res.getValue()));
+    }
+    return values;
+  }
+
+  @Test
+  public void testSetAvailableResourcesToQueue1() {
+    String queueName = "single";
+    QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null,
+        false, CONF);
+    MetricsSource queueSource = queueSource(ms, queueName);
+
+    metrics.setAvailableResourcesToQueue(newResource(
+        GB, 4,
+        ImmutableMap.<String, String> builder()
+            .put(CUSTOM_RES_1, String.valueOf(5 * GB))
+            .put(CUSTOM_RES_2, String.valueOf(6 * GB))
+            .build()));
+    ResourceMetricsChecker.create()
+        .gaugeLong(AVAILABLE_MB, GB)
+        .gaugeInt(AVAILABLE_V_CORES, 4)
+        .checkAgainst(queueSource);
+
+    assertCustomResourceValue(metrics,
+        MetricsForCustomResource.AVAILABLE,
+        QueueMetrics::getAvailableResources, CUSTOM_RES_1, 5 * GB);
+    assertCustomResourceValue(metrics,
+        MetricsForCustomResource.AVAILABLE,
+        QueueMetrics::getAvailableResources, CUSTOM_RES_2, 6 * GB);
+  }
+
+  @Test
+  public void testSetAvailableResourcesToQueue2() {
+    String queueName = "single";
+    QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null,
+        false, CONF);
+    MetricsSource queueSource = queueSource(ms, queueName);
+
+    metrics.setAvailableResourcesToQueue(null,
+        newResource(GB, 4,
+        ImmutableMap.<String, String> builder()
+            .put(CUSTOM_RES_1, String.valueOf(15 * GB))
+            .put(CUSTOM_RES_2, String.valueOf(20 * GB))
+            .build()));
+    ResourceMetricsChecker.create()
+        .gaugeLong(AVAILABLE_MB, GB)
+        .gaugeInt(AVAILABLE_V_CORES, 4)
+        .checkAgainst(queueSource);
+
+    assertCustomResourceValue(metrics,
+        MetricsForCustomResource.AVAILABLE,
+        QueueMetrics::getAvailableResources, CUSTOM_RES_1, 15 * GB);
+    assertCustomResourceValue(metrics,
+        MetricsForCustomResource.AVAILABLE,
+        QueueMetrics::getAvailableResources, CUSTOM_RES_2, 20 * GB);
+  }
+
+  @Test
+  public void testIncreasePendingResources() {
+    QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
+        .withLeafQueue(createBasicQueueHierarchy())
+        .withResourceToDecrease(
+            newResource(GB, 2, getCustomResourcesWithValue(2 * GB)), 2)
+        .withResources(defaultResource)
+        .build();
+
+    testIncreasePendingResources(testData);
+  }
+
+  @Test
+  public void testDecreasePendingResources() {
+    Resource resourceToDecrease =
+        newResource(GB, 2, getCustomResourcesWithValue(2 * GB));
+    int containersToDecrease = 2;
+    int containers = 5;
+    QueueMetricsTestData testData =
+        createQueueMetricsTestDataWithContainers(containers)
+            .withLeafQueue(createBasicQueueHierarchy())
+        .withResourceToDecrease(resourceToDecrease, containers)
+        .withResources(defaultResource)
+        .build();
+
+    //compute expected values
+    final int vCoresToDecrease = resourceToDecrease.getVirtualCores();
+    final long memoryMBToDecrease = resourceToDecrease.getMemorySize();
+    final int containersAfterDecrease = containers - containersToDecrease;
+    final int vcoresAfterDecrease =
+        (defaultResource.getVirtualCores() * containers)
+            - (vCoresToDecrease * containersToDecrease);
+    final long memoryAfterDecrease =
+        (defaultResource.getMemorySize() * containers)
+            - (memoryMBToDecrease * containersToDecrease);
+
+    //first, increase resources to be able to decrease some
+    testIncreasePendingResources(testData);
+
+    //decrease resources
+    testData.leafQueue.queueMetrics.decrPendingResources(testData.partition,
+        testData.user, containersToDecrease,
+        ResourceTypesTestHelper.newResource(memoryMBToDecrease,
+            vCoresToDecrease,
+            extractCustomResourcesAsStrings(resourceToDecrease)));
+
+    //check
+    ResourceMetricsChecker checker = ResourceMetricsChecker
+        .create()
+        .gaugeInt(PENDING_CONTAINERS, containersAfterDecrease)
+        .gaugeLong(PENDING_MB, memoryAfterDecrease)
+        .gaugeInt(PENDING_V_CORES, vcoresAfterDecrease)
+        .checkAgainst(testData.leafQueue.queueSource);
+
+    assertAllMetrics(testData.leafQueue, checker,
+        QueueMetrics::getPendingResources,
+        MetricsForCustomResource.PENDING,
+        computeExpectedCustomResourceValues(testData.customResourceValues,
+            (k, v) -> v * containers - (resourceToDecrease.getResourceValue(k)
+                * containersToDecrease)));
+  }
+
+  @Test
+  public void testAllocateResourcesWithoutDecreasePending() {
+    QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
+        .withLeafQueue(createBasicQueueHierarchy())
+        .withResources(defaultResource)
+        .build();
+
+    testAllocateResources(false, testData);
+  }
+
+  @Test
+  public void testAllocateResourcesWithDecreasePending() {
+    QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
+        .withLeafQueue(createBasicQueueHierarchy())
+        .withResourceToDecrease(
+            newResource(GB, 2, getCustomResourcesWithValue(2 * GB)), 2)
+        .withResources(defaultResource)
+        .build();
+
+    //first, increase pending resources to be able to decrease some
+    testIncreasePendingResources(testData);
+
+    //then allocate with decrease pending resources
+    testAllocateResources(true, testData);
+  }
+
+  @Test
+  public void testAllocateResourcesWithoutContainer() {
+    QueueMetricsTestData testData = createDefaultQueueMetricsTestData()
+        .withLeafQueue(createBasicQueueHierarchy())
+        .withResources(defaultResource)
+        .build();
+
+    //first, increase pending resources
+    testIncreasePendingResourcesWithoutContainer(testData);
+
+    Resource resource = testData.resource;
+    testData.leafQueue.queueMetrics.allocateResources(testData.partition,
+        testData.user, resource);
+
+    ResourceMetricsChecker checker = ResourceMetricsChecker.create()
+        .gaugeLong(ALLOCATED_MB, resource.getMemorySize())
+        .gaugeInt(ALLOCATED_V_CORES, resource.getVirtualCores())
+        .gaugeInt(PENDING_CONTAINERS, 1).gaugeLong(PENDING_MB, 0)
+        .gaugeInt(PENDING_V_CORES, 0);
+
+    checker.checkAgainst(testData.leafQueue.queueSource);
+    checker.checkAgainst(testData.leafQueue.getRoot().queueSource);
+
+    assertAllMetrics(testData.leafQueue, checker,
+        QueueMetrics::getPendingResources,
+        MetricsForCustomResource.PENDING, computeExpectedCustomResourceValues(
+            testData.customResourceValues, (k, v) -> 0L));
+    assertAllMetrics(testData.leafQueue, checker,
+        QueueMetrics::getAllocatedResources,
+        MetricsForCustomResource.ALLOCATED, computeExpectedCustomResourceValues(
+            testData.customResourceValues, (k, v) -> v));
+  }
+
+  @Test
+  public void testReleaseResources() {
+    int containers = 5;
+    QueueMetricsTestData testData =
+        createQueueMetricsTestDataWithContainers(containers)
+            .withLeafQueue(createBasicQueueHierarchy())
+        .withResourceToDecrease(defaultResource, containers)
+        .withResources(defaultResource)
+        .build();
+
+    //first, allocate some resources so that we can release some
+    testAllocateResources(false, testData);
+
+    testData.leafQueue.queueMetrics.releaseResources(testData.partition,
+        testData.user, containers, defaultResource);
+
+    ResourceMetricsChecker checker = ResourceMetricsChecker
+        .create()
+        .counter(AGGREGATE_CONTAINERS_ALLOCATED, containers)
+        .counter(AGGREGATE_CONTAINERS_RELEASED, containers)
+        .checkAgainst(testData.leafQueue.queueSource);
+    assertAllMetrics(testData.leafQueue, checker,
+        QueueMetrics::getAllocatedResources,
+        MetricsForCustomResource.ALLOCATED, computeExpectedCustomResourceValues(
+            testData.customResourceValues, (k, v) -> 0L));
+  }
+
+  @Test
+  public void testUpdatePreemptedSecondsForCustomResources() {
+    QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
+        .withLeafQueue(createFourLevelQueueHierarchy())
+        .withResources(defaultResource)
+        .build();
+
+    final int seconds = 1;
+    testUpdatePreemptedSeconds(testData, seconds);
+  }
+
+  @Test
+  public void testUpdatePreemptedSecondsForCustomResourcesMoreSeconds() {
+    QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
+        .withLeafQueue(createFourLevelQueueHierarchy())
+        .withResources(defaultResource)
+        .build();
+
+    final int seconds = 15;
+    testUpdatePreemptedSeconds(testData, seconds);
+  }
+
+  @Test
+  public void testReserveResources() {
+    QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
+        .withLeafQueue(createBasicQueueHierarchy())
+        .withResources(defaultResource)
+        .build();
+
+    testReserveResources(testData);
+  }
+
+  @Test
+  public void testUnreserveResources() {
+    QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
+        .withLeafQueue(createBasicQueueHierarchy())
+        .withResources(defaultResource)
+        .build();
+
+    testReserveResources(testData);
+
+    testData.leafQueue.queueMetrics.unreserveResource(testData.partition,
+        testData.user, defaultResource);
+
+    ResourceMetricsChecker checker = ResourceMetricsChecker
+        .create()
+        .gaugeInt(RESERVED_CONTAINERS, 0)
+        .gaugeLong(RESERVED_MB, 0)
+        .gaugeInt(RESERVED_V_CORES, 0)
+        .checkAgainst(testData.leafQueue.queueSource);
+    assertAllMetrics(testData.leafQueue, checker,
+        QueueMetrics::getReservedResources,
+        MetricsForCustomResource.RESERVED, computeExpectedCustomResourceValues(
+            testData.customResourceValues, (k, v) -> 0L));
+  }
+
+  @Test
+  public void testGetAllocatedResourcesWithCustomResources() {
+    QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
+        .withLeafQueue(createBasicQueueHierarchy())
+        .withResources(defaultResource)
+        .build();
+
+    testGetAllocatedResources(testData);
+  }
+
+  @Test
+  public void testGetAllocatedResourcesWithoutCustomResources() {
+    QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
+        .withResources(newResource(4 * GB, 4, Collections.emptyMap()))
+        .withLeafQueue(createBasicQueueHierarchy())
+        .build();
+
+    testGetAllocatedResources(testData);
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org