You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2019/10/15 16:01:23 UTC

[hadoop] branch branch-2 updated: YARN-8750. Refactor TestQueueMetrics. (Contributed by Szilard Nemeth)

This is an automated email from the ASF dual-hosted git repository.

epayne pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 600d54f  YARN-8750. Refactor TestQueueMetrics. (Contributed by Szilard Nemeth)
600d54f is described below

commit 600d54fcd6e1e4de2567bec66ee8124e360e872e
Author: Haibo Chen <ha...@apache.org>
AuthorDate: Thu Oct 4 13:00:31 2018 -0700

    YARN-8750. Refactor TestQueueMetrics. (Contributed by Szilard Nemeth)
    
    (cherry picked from commit e60b797c88541f94cecc7fdbcaad010c4742cfdb)
---
 .../scheduler/AppMetricsChecker.java               | 122 +++++
 .../scheduler/ResourceMetricsChecker.java          | 170 +++++++
 .../scheduler/TestQueueMetrics.java                | 550 +++++++++++++++------
 3 files changed, 701 insertions(+), 141 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppMetricsChecker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppMetricsChecker.java
new file mode 100644
index 0000000..8967234
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppMetricsChecker.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_COMPLETED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_FAILED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_KILLED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_PENDING;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_RUNNING;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_SUBMITTED;
+
+final class AppMetricsChecker {
+  private final static Logger LOG =
+      LoggerFactory.getLogger(AppMetricsChecker.class);
+
+  private static final AppMetricsChecker INITIAL_CHECKER =
+      new AppMetricsChecker()
+        .counter(APPS_SUBMITTED, 0)
+        .gaugeInt(APPS_PENDING, 0)
+        .gaugeInt(APPS_RUNNING, 0)
+        .counter(APPS_COMPLETED, 0)
+        .counter(APPS_FAILED, 0)
+        .counter(APPS_KILLED, 0);
+
+  enum AppMetricsKey {
+    APPS_SUBMITTED("AppsSubmitted"),
+    APPS_PENDING("AppsPending"),
+    APPS_RUNNING("AppsRunning"),
+    APPS_COMPLETED("AppsCompleted"),
+    APPS_FAILED("AppsFailed"),
+    APPS_KILLED("AppsKilled");
+
+    private String value;
+
+    AppMetricsKey(String value) {
+      this.value = value;
+    }
+  }
+  private final Map<AppMetricsKey, Integer> gaugesInt;
+  private final Map<AppMetricsKey, Integer> counters;
+
+  private AppMetricsChecker() {
+    this.gaugesInt = Maps.newHashMap();
+    this.counters = Maps.newHashMap();
+  }
+
+  private AppMetricsChecker(AppMetricsChecker checker) {
+    this.gaugesInt = Maps.newHashMap(checker.gaugesInt);
+    this.counters = Maps.newHashMap(checker.counters);
+  }
+
+  public static AppMetricsChecker createFromChecker(AppMetricsChecker checker) {
+    return new AppMetricsChecker(checker);
+  }
+
+  public static AppMetricsChecker create() {
+    return new AppMetricsChecker(INITIAL_CHECKER);
+  }
+
+  AppMetricsChecker gaugeInt(AppMetricsKey key, int value) {
+    gaugesInt.put(key, value);
+    return this;
+  }
+
+  AppMetricsChecker counter(AppMetricsKey key, int value) {
+    counters.put(key, value);
+    return this;
+  }
+
+  AppMetricsChecker checkAgainst(MetricsSource source, boolean all) {
+    if (source == null) {
+      throw new IllegalStateException(
+          "MetricsSource should not be null!");
+    }
+    MetricsRecordBuilder recordBuilder = getMetrics(source, all);
+    logAssertingMessage(source);
+
+    for (Map.Entry<AppMetricsKey, Integer> gauge : gaugesInt.entrySet()) {
+      assertGauge(gauge.getKey().value, gauge.getValue(), recordBuilder);
+    }
+
+    for (Map.Entry<AppMetricsKey, Integer> counter : counters.entrySet()) {
+      assertCounter(counter.getKey().value, counter.getValue(), recordBuilder);
+    }
+    return this;
+  }
+
+  private void logAssertingMessage(MetricsSource source) {
+    String queueName = ((QueueMetrics) source).queueName;
+    Map<String, QueueMetrics> users = ((QueueMetrics) source).users;
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Asserting App metrics.. QueueName: " + queueName + ", users: "
+          + (users != null && !users.isEmpty() ? users : ""));
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java
new file mode 100644
index 0000000..cd617d7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
+
+final class ResourceMetricsChecker {
+  private final static Logger LOG =
+          LoggerFactory.getLogger(ResourceMetricsChecker.class);
+
+  private static final ResourceMetricsChecker INITIAL_CHECKER =
+      new ResourceMetricsChecker()
+          .gaugeLong(ALLOCATED_MB, 0)
+          .gaugeInt(ALLOCATED_V_CORES, 0)
+          .gaugeInt(ALLOCATED_CONTAINERS, 0)
+          .counter(AGGREGATE_CONTAINERS_ALLOCATED, 0)
+          .counter(AGGREGATE_CONTAINERS_RELEASED, 0)
+          .gaugeLong(AVAILABLE_MB, 0)
+          .gaugeInt(AVAILABLE_V_CORES, 0)
+          .gaugeLong(PENDING_MB, 0)
+          .gaugeInt(PENDING_V_CORES, 0)
+          .gaugeInt(PENDING_CONTAINERS, 0)
+          .gaugeLong(RESERVED_MB, 0)
+          .gaugeInt(RESERVED_V_CORES, 0)
+          .gaugeInt(RESERVED_CONTAINERS, 0);
+
+  enum ResourceMetricsKey {
+    ALLOCATED_MB("AllocatedMB"),
+    ALLOCATED_V_CORES("AllocatedVCores"),
+    ALLOCATED_CONTAINERS("AllocatedContainers"),
+    AGGREGATE_CONTAINERS_ALLOCATED("AggregateContainersAllocated"),
+    AGGREGATE_CONTAINERS_RELEASED("AggregateContainersReleased"),
+    AVAILABLE_MB("AvailableMB"),
+    AVAILABLE_V_CORES("AvailableVCores"),
+    PENDING_MB("PendingMB"),
+    PENDING_V_CORES("PendingVCores"),
+    PENDING_CONTAINERS("PendingContainers"),
+    RESERVED_MB("ReservedMB"),
+    RESERVED_V_CORES("ReservedVCores"),
+    RESERVED_CONTAINERS("ReservedContainers");
+
+    private String value;
+
+    ResourceMetricsKey(String value) {
+      this.value = value;
+    }
+
+    public String getValue() {
+      return value;
+    }
+  }
+
+  private final Map<ResourceMetricsKey, Long> gaugesLong;
+  private final Map<ResourceMetricsKey, Integer> gaugesInt;
+  private final Map<ResourceMetricsKey, Long> counters;
+
+  private ResourceMetricsChecker() {
+    this.gaugesLong = Maps.newHashMap();
+    this.gaugesInt = Maps.newHashMap();
+    this.counters = Maps.newHashMap();
+  }
+
+  private ResourceMetricsChecker(ResourceMetricsChecker checker) {
+    this.gaugesLong = Maps.newHashMap(checker.gaugesLong);
+    this.gaugesInt = Maps.newHashMap(checker.gaugesInt);
+    this.counters = Maps.newHashMap(checker.counters);
+  }
+
+  public static ResourceMetricsChecker createFromChecker(
+          ResourceMetricsChecker checker) {
+    return new ResourceMetricsChecker(checker);
+  }
+
+  public static ResourceMetricsChecker create() {
+    return new ResourceMetricsChecker(INITIAL_CHECKER);
+  }
+
+  ResourceMetricsChecker gaugeLong(ResourceMetricsKey key, long value) {
+    gaugesLong.put(key, value);
+    return this;
+  }
+
+  ResourceMetricsChecker gaugeInt(ResourceMetricsKey key, int value) {
+    gaugesInt.put(key, value);
+    return this;
+  }
+
+  ResourceMetricsChecker counter(ResourceMetricsKey key, long value) {
+    counters.put(key, value);
+    return this;
+  }
+
+  ResourceMetricsChecker checkAgainst(MetricsSource source) {
+    if (source == null) {
+      throw new IllegalStateException("MetricsSource should not be null!");
+    }
+    MetricsRecordBuilder recordBuilder = getMetrics(source);
+    logAssertingMessage(source);
+
+    for (Map.Entry<ResourceMetricsKey, Long> gauge : gaugesLong.entrySet()) {
+      assertGauge(gauge.getKey().value, gauge.getValue(), recordBuilder);
+    }
+
+    for (Map.Entry<ResourceMetricsKey, Integer> gauge : gaugesInt.entrySet()) {
+      assertGauge(gauge.getKey().value, gauge.getValue(), recordBuilder);
+    }
+
+    for (Map.Entry<ResourceMetricsKey, Long> counter : counters.entrySet()) {
+      assertCounter(counter.getKey().value, counter.getValue(), recordBuilder);
+    }
+    return this;
+  }
+
+  private void logAssertingMessage(MetricsSource source) {
+    String queueName = ((QueueMetrics) source).queueName;
+    Map<String, QueueMetrics> users = ((QueueMetrics) source).users;
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Asserting Resource metrics.. QueueName: " + queueName
+          + ", users: " + (users != null && !users.isEmpty() ? users : ""));
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
index 196d4c2..c971d65 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
@@ -19,8 +19,10 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
-import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .AppMetricsChecker.AppMetricsKey.*;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.*;
 import static org.junit.Assert.assertNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -45,9 +47,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class TestQueueMetrics {
-  static final int GB = 1024; // MB
+  private static final int GB = 1024; // MB
   private static final Configuration conf = new Configuration();
-
   private MetricsSystem ms;
 
   @Before
@@ -56,7 +57,8 @@ public class TestQueueMetrics {
     QueueMetrics.clearQueueMetrics();
   }
   
-  @Test public void testDefaultSingleQueueMetrics() {
+  @Test
+  public void testDefaultSingleQueueMetrics() {
     String queueName = "single";
     String user = "alice";
 
@@ -67,9 +69,13 @@ public class TestQueueMetrics {
 
     metrics.submitApp(user);
     MetricsSource userSource = userSource(ms, queueName, user);
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
+    AppMetricsChecker appMetricsChecker = AppMetricsChecker.create()
+        .counter(APPS_SUBMITTED, 1)
+        .checkAgainst(queueSource, true);
     metrics.submitAppAttempt(user);
-    checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
+    appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
+        .gaugeInt(APPS_PENDING, 1)
+        .checkAgainst(queueSource, true);
 
     metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
         Resources.createResource(100*GB, 100));
@@ -77,34 +83,63 @@ public class TestQueueMetrics {
         user, 5, Resources.createResource(3*GB, 3));
     // Available resources is set externally, as it depends on dynamic
     // configurable cluster/queue resources
-    checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
+    ResourceMetricsChecker rmChecker = ResourceMetricsChecker.create()
+        .gaugeLong(AVAILABLE_MB, 100 * GB)
+        .gaugeInt(AVAILABLE_V_CORES, 100)
+        .gaugeLong(PENDING_MB, 15 * GB)
+        .gaugeInt(PENDING_V_CORES, 15)
+        .gaugeInt(PENDING_CONTAINERS, 5)
+        .checkAgainst(queueSource);
 
     metrics.runAppAttempt(app.getApplicationId(), user);
-    checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
+    appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
+        .gaugeInt(APPS_PENDING, 0)
+        .gaugeInt(APPS_RUNNING, 1)
+        .checkAgainst(queueSource, true);
 
     metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
         user, 3, Resources.createResource(2*GB, 2), true);
-    checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
+    rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
+        .gaugeLong(ALLOCATED_MB, 6 * GB)
+        .gaugeInt(ALLOCATED_V_CORES, 6)
+        .gaugeInt(ALLOCATED_CONTAINERS, 3)
+        .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
+        .gaugeLong(PENDING_MB, 9 * GB)
+        .gaugeInt(PENDING_V_CORES, 9)
+        .gaugeInt(PENDING_CONTAINERS, 2)
+        .checkAgainst(queueSource);
 
     metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
         user, 1, Resources.createResource(2*GB, 2));
-    checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
+    rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
+        .gaugeLong(ALLOCATED_MB, 4 * GB)
+        .gaugeInt(ALLOCATED_V_CORES, 4)
+        .gaugeInt(ALLOCATED_CONTAINERS, 2)
+        .counter(AGGREGATE_CONTAINERS_RELEASED, 1)
+        .checkAgainst(queueSource);
 
     metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
         user, 0, Resources.createResource(2 * GB, 2));
-    checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2,
-        0, 0, 0);
+    //nothing should change in values
+    rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
+        .checkAgainst(queueSource);
 
     metrics.decrPendingResources(RMNodeLabelsManager.NO_LABEL,
         user, 0, Resources.createResource(2 * GB, 2));
-    checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2,
-        0, 0, 0);
+    //nothing should change in values
+    ResourceMetricsChecker.createFromChecker(rmChecker)
+        .checkAgainst(queueSource);
 
     metrics.finishAppAttempt(
         app.getApplicationId(), app.isPending(), app.getUser());
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
+    appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
+        .counter(APPS_SUBMITTED, 1)
+        .gaugeInt(APPS_RUNNING, 0)
+        .checkAgainst(queueSource, true);
     metrics.finishApp(user, RMAppState.FINISHED);
-    checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
+    AppMetricsChecker.createFromChecker(appMetricsChecker)
+        .counter(APPS_COMPLETED, 1)
+        .checkAgainst(queueSource, true);
     assertNull(userSource);
   }
   
@@ -120,50 +155,77 @@ public class TestQueueMetrics {
 
     metrics.submitApp(user);
     MetricsSource userSource = userSource(ms, queueName, user);
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
+    AppMetricsChecker appMetricsChecker = AppMetricsChecker.create()
+        .counter(APPS_SUBMITTED, 1)
+        .checkAgainst(queueSource, true);
     metrics.submitAppAttempt(user);
-    checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
+    appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
+        .gaugeInt(APPS_PENDING, 1)
+        .checkAgainst(queueSource, true);
 
     metrics.runAppAttempt(app.getApplicationId(), user);
-    checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
+    appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
+        .gaugeInt(APPS_PENDING, 0)
+        .gaugeInt(APPS_RUNNING, 1)
+        .checkAgainst(queueSource, true);
 
     metrics.finishAppAttempt(
         app.getApplicationId(), app.isPending(), app.getUser());
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
+    appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
+        .gaugeInt(APPS_RUNNING, 0)
+        .checkAgainst(queueSource, true);
 
     // As the application has failed, framework retries the same application
     // based on configuration
     metrics.submitAppAttempt(user);
-    checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
+    appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
+        .gaugeInt(APPS_PENDING, 1)
+        .checkAgainst(queueSource, true);
 
     metrics.runAppAttempt(app.getApplicationId(), user);
-    checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
+    appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
+        .gaugeInt(APPS_PENDING, 0)
+        .gaugeInt(APPS_RUNNING, 1)
+        .checkAgainst(queueSource, true);
 
     // Suppose say application has failed this time as well.
     metrics.finishAppAttempt(
         app.getApplicationId(), app.isPending(), app.getUser());
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
+    appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
+        .gaugeInt(APPS_RUNNING, 0)
+        .checkAgainst(queueSource, true);
 
     // As the application has failed, framework retries the same application
     // based on configuration
     metrics.submitAppAttempt(user);
-    checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
+    appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
+        .gaugeInt(APPS_PENDING, 1)
+        .checkAgainst(queueSource, true);
 
     metrics.runAppAttempt(app.getApplicationId(), user);
-    checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
+    appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
+        .gaugeInt(APPS_PENDING, 0)
+        .gaugeInt(APPS_RUNNING, 1)
+        .checkAgainst(queueSource, true);
 
     // Suppose say application has failed, and there's no more retries.
     metrics.finishAppAttempt(
         app.getApplicationId(), app.isPending(), app.getUser());
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
+    appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
+        .gaugeInt(APPS_RUNNING, 0)
+        .checkAgainst(queueSource, true);
 
     metrics.finishApp(user, RMAppState.FAILED);
-    checkApps(queueSource, 1, 0, 0, 0, 1, 0, true);
+    AppMetricsChecker.createFromChecker(appMetricsChecker)
+        .gaugeInt(APPS_RUNNING, 0)
+        .counter(APPS_FAILED, 1)
+        .checkAgainst(queueSource, true);
 
     assertNull(userSource);
   }
 
-  @Test public void testSingleQueueWithUserMetrics() {
+  @Test
+  public void testSingleQueueWithUserMetrics() {
     String queueName = "single2";
     String user = "dodo";
 
@@ -175,12 +237,22 @@ public class TestQueueMetrics {
     metrics.submitApp(user);
     MetricsSource userSource = userSource(ms, queueName, user);
 
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
-    checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
+    AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create()
+        .counter(APPS_SUBMITTED, 1)
+        .checkAgainst(queueSource, true);
+    AppMetricsChecker appMetricsUserSourceChecker = AppMetricsChecker.create()
+        .counter(APPS_SUBMITTED, 1)
+        .checkAgainst(userSource, true);
 
     metrics.submitAppAttempt(user);
-    checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
-    checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
+    appMetricsQueueSourceChecker = AppMetricsChecker
+        .createFromChecker(appMetricsQueueSourceChecker)
+        .gaugeInt(APPS_PENDING, 1)
+        .checkAgainst(queueSource, true);
+    appMetricsUserSourceChecker = AppMetricsChecker
+        .createFromChecker(appMetricsUserSourceChecker)
+        .gaugeInt(APPS_PENDING, 1)
+        .checkAgainst(userSource, true);
 
     metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
         Resources.createResource(100*GB, 100));
@@ -188,36 +260,97 @@ public class TestQueueMetrics {
         user, Resources.createResource(10*GB, 10));
     metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
         user, 5, Resources.createResource(3*GB, 3));
+
     // Available resources is set externally, as it depends on dynamic
     // configurable cluster/queue resources
-    checkResources(queueSource, 0, 0, 0, 0, 0,  100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
-    checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
+    ResourceMetricsChecker resMetricsQueueSourceChecker =
+        ResourceMetricsChecker.create()
+            .gaugeLong(AVAILABLE_MB, 100 * GB)
+            .gaugeInt(AVAILABLE_V_CORES, 100)
+            .gaugeLong(PENDING_MB, 15 * GB)
+            .gaugeInt(PENDING_V_CORES, 15)
+            .gaugeInt(PENDING_CONTAINERS, 5)
+            .checkAgainst(queueSource);
+    ResourceMetricsChecker resMetricsUserSourceChecker =
+        ResourceMetricsChecker.create()
+            .gaugeLong(AVAILABLE_MB, 10 * GB)
+            .gaugeInt(AVAILABLE_V_CORES, 10)
+            .gaugeLong(PENDING_MB, 15 * GB)
+            .gaugeInt(PENDING_V_CORES, 15)
+            .gaugeInt(PENDING_CONTAINERS, 5)
+            .checkAgainst(userSource);
 
     metrics.runAppAttempt(app.getApplicationId(), user);
-    checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
-    checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
+    appMetricsQueueSourceChecker = AppMetricsChecker
+        .createFromChecker(appMetricsQueueSourceChecker)
+            .gaugeInt(APPS_PENDING, 0)
+            .gaugeInt(APPS_RUNNING, 1)
+            .checkAgainst(queueSource, true);
+    appMetricsUserSourceChecker = AppMetricsChecker
+        .createFromChecker(appMetricsUserSourceChecker)
+            .gaugeInt(APPS_PENDING, 0)
+            .gaugeInt(APPS_RUNNING, 1)
+            .checkAgainst(userSource, true);
 
     metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
         user, 3, Resources.createResource(2*GB, 2), true);
-    checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
-    checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
+    resMetricsQueueSourceChecker =
+        ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
+            .gaugeLong(ALLOCATED_MB, 6 * GB)
+            .gaugeInt(ALLOCATED_V_CORES, 6)
+            .gaugeInt(ALLOCATED_CONTAINERS, 3)
+            .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
+            .gaugeLong(PENDING_MB, 9 * GB)
+            .gaugeInt(PENDING_V_CORES, 9)
+            .gaugeInt(PENDING_CONTAINERS, 2)
+            .checkAgainst(queueSource);
+    resMetricsUserSourceChecker =
+        ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
+            .gaugeLong(ALLOCATED_MB, 6 * GB)
+            .gaugeInt(ALLOCATED_V_CORES, 6)
+            .gaugeInt(ALLOCATED_CONTAINERS, 3)
+            .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
+            .gaugeLong(PENDING_MB, 9 * GB)
+            .gaugeInt(PENDING_V_CORES, 9)
+            .gaugeInt(PENDING_CONTAINERS, 2)
+            .checkAgainst(userSource);
 
     metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
         user, 1, Resources.createResource(2*GB, 2));
-    checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
-    checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
+    ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
+            .gaugeLong(ALLOCATED_MB, 4 * GB)
+            .gaugeInt(ALLOCATED_V_CORES, 4)
+            .gaugeInt(ALLOCATED_CONTAINERS, 2)
+            .counter(AGGREGATE_CONTAINERS_RELEASED, 1)
+            .checkAgainst(queueSource);
+    ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
+            .gaugeLong(ALLOCATED_MB, 4 * GB)
+            .gaugeInt(ALLOCATED_V_CORES, 4)
+            .gaugeInt(ALLOCATED_CONTAINERS, 2)
+            .counter(AGGREGATE_CONTAINERS_RELEASED, 1)
+            .checkAgainst(userSource);
 
     metrics.finishAppAttempt(
         app.getApplicationId(), app.isPending(), app.getUser());
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
-    checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
+    appMetricsQueueSourceChecker =
+        AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
+            .gaugeInt(APPS_RUNNING, 0)
+            .checkAgainst(queueSource, true);
+    appMetricsUserSourceChecker =
+        AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
+            .gaugeInt(APPS_RUNNING, 0)
+            .checkAgainst(userSource, true);
     metrics.finishApp(user, RMAppState.FINISHED);
-    checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
-    checkApps(userSource, 1, 0, 0, 1, 0, 0, true);
+    AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
+        .counter(APPS_COMPLETED, 1)
+        .checkAgainst(queueSource, true);
+    AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
+        .counter(APPS_COMPLETED, 1)
+        .checkAgainst(userSource, true);
   }
 
-
-  @Test public void testNodeTypeMetrics() {
+  @Test
+  public void testNodeTypeMetrics() {
     String parentQueueName = "root";
     String leafQueueName = "root.leaf";
     String user = "alice";
@@ -237,33 +370,32 @@ public class TestQueueMetrics {
     MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
 
     metrics.incrNodeTypeAggregations(user, NodeType.NODE_LOCAL);
-    checkAggregatedNodeTypes(queueSource,1L,0L,0L);
-    checkAggregatedNodeTypes(parentQueueSource,1L,0L,0L);
-    checkAggregatedNodeTypes(userSource,1L,0L,0L);
-    checkAggregatedNodeTypes(parentUserSource,1L,0L,0L);
+    checkAggregatedNodeTypes(queueSource, 1L, 0L, 0L);
+    checkAggregatedNodeTypes(parentQueueSource, 1L, 0L, 0L);
+    checkAggregatedNodeTypes(userSource, 1L, 0L, 0L);
+    checkAggregatedNodeTypes(parentUserSource, 1L, 0L, 0L);
 
     metrics.incrNodeTypeAggregations(user, NodeType.RACK_LOCAL);
-    checkAggregatedNodeTypes(queueSource,1L,1L,0L);
-    checkAggregatedNodeTypes(parentQueueSource,1L,1L,0L);
-    checkAggregatedNodeTypes(userSource,1L,1L,0L);
-    checkAggregatedNodeTypes(parentUserSource,1L,1L,0L);
+    checkAggregatedNodeTypes(queueSource, 1L, 1L, 0L);
+    checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 0L);
+    checkAggregatedNodeTypes(userSource, 1L, 1L, 0L);
+    checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 0L);
 
     metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH);
-    checkAggregatedNodeTypes(queueSource,1L,1L,1L);
-    checkAggregatedNodeTypes(parentQueueSource,1L,1L,1L);
-    checkAggregatedNodeTypes(userSource,1L,1L,1L);
-    checkAggregatedNodeTypes(parentUserSource,1L,1L,1L);
+    checkAggregatedNodeTypes(queueSource, 1L, 1L, 1L);
+    checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 1L);
+    checkAggregatedNodeTypes(userSource, 1L, 1L, 1L);
+    checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 1L);
 
     metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH);
-    checkAggregatedNodeTypes(queueSource,1L,1L,2L);
-    checkAggregatedNodeTypes(parentQueueSource,1L,1L,2L);
-    checkAggregatedNodeTypes(userSource,1L,1L,2L);
-    checkAggregatedNodeTypes(parentUserSource,1L,1L,2L);
-
+    checkAggregatedNodeTypes(queueSource, 1L, 1L, 2L);
+    checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 2L);
+    checkAggregatedNodeTypes(userSource, 1L, 1L, 2L);
+    checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 2L);
   }
 
-
-  @Test public void testTwoLevelWithUserMetrics() {
+  @Test
+  public void testTwoLevelWithUserMetrics() {
     String parentQueueName = "root";
     String leafQueueName = "root.leaf";
     String user = "alice";
@@ -282,16 +414,38 @@ public class TestQueueMetrics {
     MetricsSource userSource = userSource(ms, leafQueueName, user);
     MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
 
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
-    checkApps(parentQueueSource, 1, 0, 0, 0, 0, 0, true);
-    checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
-    checkApps(parentUserSource, 1, 0, 0, 0, 0, 0, true);
+    AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create()
+        .counter(APPS_SUBMITTED, 1)
+        .checkAgainst(queueSource, true);
+    AppMetricsChecker appMetricsParentQueueSourceChecker =
+        AppMetricsChecker.create()
+        .counter(APPS_SUBMITTED, 1)
+        .checkAgainst(parentQueueSource, true);
+    AppMetricsChecker appMetricsUserSourceChecker = AppMetricsChecker.create()
+        .counter(APPS_SUBMITTED, 1)
+        .checkAgainst(userSource, true);
+    AppMetricsChecker appMetricsParentUserSourceChecker =
+        AppMetricsChecker.create()
+        .counter(APPS_SUBMITTED, 1)
+        .checkAgainst(parentUserSource, true);
 
     metrics.submitAppAttempt(user);
-    checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
-    checkApps(parentQueueSource, 1, 1, 0, 0, 0, 0, true);
-    checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
-    checkApps(parentUserSource, 1, 1, 0, 0, 0, 0, true);
+    appMetricsQueueSourceChecker =
+        AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
+        .gaugeInt(APPS_PENDING, 1)
+        .checkAgainst(queueSource, true);
+    appMetricsParentQueueSourceChecker =
+        AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker)
+        .gaugeInt(APPS_PENDING, 1)
+        .checkAgainst(parentQueueSource, true);
+    appMetricsUserSourceChecker =
+        AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
+        .gaugeInt(APPS_PENDING, 1)
+        .checkAgainst(userSource, true);
+    appMetricsParentUserSourceChecker =
+        AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker)
+        .gaugeInt(APPS_PENDING, 1)
+        .checkAgainst(parentUserSource, true);
 
     parentMetrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
         Resources.createResource(100*GB, 100));
@@ -303,14 +457,51 @@ public class TestQueueMetrics {
         user, Resources.createResource(10*GB, 10));
     metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
         user, 5, Resources.createResource(3*GB, 3));
-    checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
-    checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
-    checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
-    checkResources(parentUserSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
+
+    ResourceMetricsChecker resMetricsQueueSourceChecker =
+        ResourceMetricsChecker.create()
+        .gaugeLong(AVAILABLE_MB, 100 * GB)
+        .gaugeInt(AVAILABLE_V_CORES, 100)
+        .gaugeLong(PENDING_MB, 15 * GB)
+        .gaugeInt(PENDING_V_CORES, 15)
+        .gaugeInt(PENDING_CONTAINERS, 5)
+        .checkAgainst(queueSource);
+    ResourceMetricsChecker resMetricsParentQueueSourceChecker =
+        ResourceMetricsChecker.create()
+            .gaugeLong(AVAILABLE_MB, 100 * GB)
+            .gaugeInt(AVAILABLE_V_CORES, 100)
+            .gaugeLong(PENDING_MB, 15 * GB)
+            .gaugeInt(PENDING_V_CORES, 15)
+            .gaugeInt(PENDING_CONTAINERS, 5)
+            .checkAgainst(parentQueueSource);
+    ResourceMetricsChecker resMetricsUserSourceChecker =
+        ResourceMetricsChecker.create()
+            .gaugeLong(AVAILABLE_MB, 10 * GB)
+            .gaugeInt(AVAILABLE_V_CORES, 10)
+            .gaugeLong(PENDING_MB, 15 * GB)
+            .gaugeInt(PENDING_V_CORES, 15)
+            .gaugeInt(PENDING_CONTAINERS, 5)
+            .checkAgainst(userSource);
+    ResourceMetricsChecker resMetricsParentUserSourceChecker =
+        ResourceMetricsChecker.create()
+            .gaugeLong(AVAILABLE_MB, 10 * GB)
+            .gaugeInt(AVAILABLE_V_CORES, 10)
+            .gaugeLong(PENDING_MB, 15 * GB)
+            .gaugeInt(PENDING_V_CORES, 15)
+            .gaugeInt(PENDING_CONTAINERS, 5)
+            .checkAgainst(parentUserSource);
 
     metrics.runAppAttempt(app.getApplicationId(), user);
-    checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
-    checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
+    appMetricsQueueSourceChecker =
+        AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
+            .gaugeInt(APPS_PENDING, 0)
+            .gaugeInt(APPS_RUNNING, 1)
+            .checkAgainst(queueSource, true);
+    appMetricsUserSourceChecker =
+        AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
+            .gaugeInt(APPS_PENDING, 0)
+            .gaugeInt(APPS_RUNNING, 1)
+            .checkAgainst(userSource, true);
 
     metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
         user, 3, Resources.createResource(2*GB, 2), true);
@@ -318,32 +509,139 @@ public class TestQueueMetrics {
         user, Resources.createResource(3*GB, 3));
     // Available resources is set externally, as it depends on dynamic
     // configurable cluster/queue resources
-    checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1);
-    checkResources(parentQueueSource, 6*GB, 6, 3, 3, 0,  100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1);
-    checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
-    checkResources(parentUserSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
+    resMetricsQueueSourceChecker =
+        ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
+            .gaugeLong(ALLOCATED_MB, 6 * GB)
+            .gaugeInt(ALLOCATED_V_CORES, 6)
+            .gaugeInt(ALLOCATED_CONTAINERS, 3)
+            .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
+            .gaugeLong(PENDING_MB, 9 * GB)
+            .gaugeInt(PENDING_V_CORES, 9)
+            .gaugeInt(PENDING_CONTAINERS, 2)
+            .gaugeLong(RESERVED_MB, 3 * GB)
+            .gaugeInt(RESERVED_V_CORES, 3)
+            .gaugeInt(RESERVED_CONTAINERS, 1)
+            .checkAgainst(queueSource);
+    resMetricsParentQueueSourceChecker =
+        ResourceMetricsChecker
+            .createFromChecker(resMetricsParentQueueSourceChecker)
+            .gaugeLong(ALLOCATED_MB, 6 * GB)
+            .gaugeInt(ALLOCATED_V_CORES, 6)
+            .gaugeInt(ALLOCATED_CONTAINERS, 3)
+            .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
+            .gaugeLong(PENDING_MB, 9 * GB)
+            .gaugeInt(PENDING_V_CORES, 9)
+            .gaugeInt(PENDING_CONTAINERS, 2)
+            .gaugeLong(RESERVED_MB, 3 * GB)
+            .gaugeInt(RESERVED_V_CORES, 3)
+            .gaugeInt(RESERVED_CONTAINERS, 1)
+            .checkAgainst(parentQueueSource);
+    resMetricsUserSourceChecker =
+        ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
+            .gaugeLong(ALLOCATED_MB, 6 * GB)
+            .gaugeInt(ALLOCATED_V_CORES, 6)
+            .gaugeInt(ALLOCATED_CONTAINERS, 3)
+            .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
+            .gaugeLong(PENDING_MB, 9 * GB)
+            .gaugeInt(PENDING_V_CORES, 9)
+            .gaugeInt(PENDING_CONTAINERS, 2)
+            .gaugeLong(RESERVED_MB, 3 * GB)
+            .gaugeInt(RESERVED_V_CORES, 3)
+            .gaugeInt(RESERVED_CONTAINERS, 1)
+        .checkAgainst(userSource);
+    resMetricsParentUserSourceChecker = ResourceMetricsChecker
+        .createFromChecker(resMetricsParentUserSourceChecker)
+            .gaugeLong(ALLOCATED_MB, 6 * GB)
+            .gaugeInt(ALLOCATED_V_CORES, 6)
+            .gaugeInt(ALLOCATED_CONTAINERS, 3)
+            .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
+            .gaugeLong(PENDING_MB, 9 * GB)
+            .gaugeInt(PENDING_V_CORES, 9)
+            .gaugeInt(PENDING_CONTAINERS, 2)
+            .gaugeLong(RESERVED_MB, 3 * GB)
+            .gaugeInt(RESERVED_V_CORES, 3)
+            .gaugeInt(RESERVED_CONTAINERS, 1)
+            .checkAgainst(parentUserSource);
 
     metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
         user, 1, Resources.createResource(2*GB, 2));
     metrics.unreserveResource(RMNodeLabelsManager.NO_LABEL,
           user, Resources.createResource(3*GB, 3));
-    checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
-    checkResources(parentQueueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
-    checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
-    checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
+    ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
+        .gaugeLong(ALLOCATED_MB, 4 * GB)
+        .gaugeInt(ALLOCATED_V_CORES, 4)
+        .gaugeInt(ALLOCATED_CONTAINERS, 2)
+        .counter(AGGREGATE_CONTAINERS_RELEASED, 1)
+        .gaugeLong(RESERVED_MB, 0)
+        .gaugeInt(RESERVED_V_CORES, 0)
+        .gaugeInt(RESERVED_CONTAINERS, 0)
+        .checkAgainst(queueSource);
+    ResourceMetricsChecker.createFromChecker(resMetricsParentQueueSourceChecker)
+        .gaugeLong(ALLOCATED_MB, 4 * GB)
+        .gaugeInt(ALLOCATED_V_CORES, 4)
+        .gaugeInt(ALLOCATED_CONTAINERS, 2)
+        .counter(AGGREGATE_CONTAINERS_RELEASED, 1)
+        .gaugeLong(RESERVED_MB, 0)
+        .gaugeInt(RESERVED_V_CORES, 0)
+        .gaugeInt(RESERVED_CONTAINERS, 0)
+        .checkAgainst(parentQueueSource);
+    ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
+        .gaugeLong(ALLOCATED_MB, 4 * GB)
+        .gaugeInt(ALLOCATED_V_CORES, 4)
+        .gaugeInt(ALLOCATED_CONTAINERS, 2)
+        .counter(AGGREGATE_CONTAINERS_RELEASED, 1)
+        .gaugeLong(RESERVED_MB, 0)
+        .gaugeInt(RESERVED_V_CORES, 0)
+        .gaugeInt(RESERVED_CONTAINERS, 0)
+        .checkAgainst(userSource);
+    ResourceMetricsChecker.createFromChecker(resMetricsParentUserSourceChecker)
+        .gaugeLong(ALLOCATED_MB, 4 * GB)
+        .gaugeInt(ALLOCATED_V_CORES, 4)
+        .gaugeInt(ALLOCATED_CONTAINERS, 2)
+        .counter(AGGREGATE_CONTAINERS_RELEASED, 1)
+        .gaugeLong(RESERVED_MB, 0)
+        .gaugeInt(RESERVED_V_CORES, 0)
+        .gaugeInt(RESERVED_CONTAINERS, 0)
+        .checkAgainst(parentUserSource);
 
     metrics.finishAppAttempt(
         app.getApplicationId(), app.isPending(), app.getUser());
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
-    checkApps(parentQueueSource, 1, 0, 0, 0, 0, 0, true);
-    checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
-    checkApps(parentUserSource, 1, 0, 0, 0, 0, 0, true);
+    appMetricsQueueSourceChecker = AppMetricsChecker
+        .createFromChecker(appMetricsQueueSourceChecker)
+            .counter(APPS_SUBMITTED, 1)
+            .gaugeInt(APPS_RUNNING, 0)
+            .checkAgainst(queueSource, true);
+    appMetricsParentQueueSourceChecker = AppMetricsChecker
+            .createFromChecker(appMetricsParentQueueSourceChecker)
+            .counter(APPS_SUBMITTED, 1)
+            .gaugeInt(APPS_PENDING, 0)
+            .gaugeInt(APPS_RUNNING, 0)
+            .checkAgainst(parentQueueSource, true);
+    appMetricsUserSourceChecker = AppMetricsChecker
+            .createFromChecker(appMetricsUserSourceChecker)
+            .counter(APPS_SUBMITTED, 1)
+            .gaugeInt(APPS_RUNNING, 0)
+            .checkAgainst(userSource, true);
+    appMetricsParentUserSourceChecker = AppMetricsChecker
+            .createFromChecker(appMetricsParentUserSourceChecker)
+            .counter(APPS_SUBMITTED, 1)
+            .gaugeInt(APPS_PENDING, 0)
+            .gaugeInt(APPS_RUNNING, 0)
+            .checkAgainst(parentUserSource, true);
 
     metrics.finishApp(user, RMAppState.FINISHED);
-    checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
-    checkApps(parentQueueSource, 1, 0, 0, 1, 0, 0, true);
-    checkApps(userSource, 1, 0, 0, 1, 0, 0, true);
-    checkApps(parentUserSource, 1, 0, 0, 1, 0, 0, true);
+    AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
+        .counter(APPS_COMPLETED, 1)
+        .checkAgainst(queueSource, true);
+    AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker)
+        .counter(APPS_COMPLETED, 1)
+        .checkAgainst(parentQueueSource, true);
+    AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
+        .counter(APPS_COMPLETED, 1)
+        .checkAgainst(userSource, true);
+    AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker)
+        .counter(APPS_COMPLETED, 1)
+        .checkAgainst(parentUserSource, true);
   }
   
   @Test 
@@ -383,8 +681,9 @@ public class TestQueueMetrics {
       FifoScheduler.class, ResourceScheduler.class);
     MockRM rm = new MockRM(conf);
     QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
-    checkApps(metrics, 0, 0, 0, 0, 0, 0, true);
-    MetricsAsserts.assertGauge("ReservedContainers", 0, metrics);
+    AppMetricsChecker.create()
+        .checkAgainst(metrics, true);
+    MetricsAsserts.assertGauge(RESERVED_CONTAINERS.getValue(), 0, metrics);
   }
 
   // This is to test all metrics can consistently show up if specified true to
@@ -396,52 +695,23 @@ public class TestQueueMetrics {
     QueueMetrics.forQueue(ms, queueName, null, false, conf);
     MetricsSource queueSource = queueSource(ms, queueName);
 
-    checkApps(queueSource, 0, 0, 0, 0, 0, 0, true);
+    AppMetricsChecker.create()
+        .checkAgainst(queueSource, true);
     try {
       // do not collect all metrics
-      checkApps(queueSource, 0, 0, 0, 0, 0, 0, false);
+      AppMetricsChecker.create()
+          .checkAgainst(queueSource, false);
       Assert.fail();
     } catch (AssertionError e) {
-      Assert.assertTrue(e.getMessage().contains(
-        "Expected exactly one metric for name "));
+      Assert.assertTrue(
+              e.getMessage().contains("Expected exactly one metric for name "));
     }
     // collect all metrics
-    checkApps(queueSource, 0, 0, 0, 0, 0, 0, true);
-  }
-
-  public static void checkApps(MetricsSource source, int submitted, int pending,
-      int running, int completed, int failed, int killed, boolean all) {
-    MetricsRecordBuilder rb = getMetrics(source, all);
-    assertCounter("AppsSubmitted", submitted, rb);
-    assertGauge("AppsPending", pending, rb);
-    assertGauge("AppsRunning", running, rb);
-    assertCounter("AppsCompleted", completed, rb);
-    assertCounter("AppsFailed", failed, rb);
-    assertCounter("AppsKilled", killed, rb);
-  }
-
-  public static void checkResources(MetricsSource source, long allocatedMB,
-      int allocatedCores, int allocCtnrs, long aggreAllocCtnrs,
-      long aggreReleasedCtnrs, long availableMB, int availableCores, long pendingMB,
-      int pendingCores, int pendingCtnrs, long reservedMB, int reservedCores,
-      int reservedCtnrs) {
-    MetricsRecordBuilder rb = getMetrics(source);
-    assertGauge("AllocatedMB", allocatedMB, rb);
-    assertGauge("AllocatedVCores", allocatedCores, rb);
-    assertGauge("AllocatedContainers", allocCtnrs, rb);
-    assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb);
-    assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb);
-    assertGauge("AvailableMB", availableMB, rb);
-    assertGauge("AvailableVCores", availableCores, rb);
-    assertGauge("PendingMB", pendingMB, rb);
-    assertGauge("PendingVCores", pendingCores, rb);
-    assertGauge("PendingContainers", pendingCtnrs, rb);
-    assertGauge("ReservedMB", reservedMB, rb);
-    assertGauge("ReservedVCores", reservedCores, rb);
-    assertGauge("ReservedContainers", reservedCtnrs, rb);
+    AppMetricsChecker.create()
+        .checkAgainst(queueSource, true);
   }
 
-  public static void checkAggregatedNodeTypes(MetricsSource source,
+  private static void checkAggregatedNodeTypes(MetricsSource source,
       long nodeLocal, long rackLocal, long offSwitch) {
     MetricsRecordBuilder rb = getMetrics(source);
     assertCounter("AggregateNodeLocalContainersAllocated", nodeLocal, rb);
@@ -459,14 +729,12 @@ public class TestQueueMetrics {
   }
 
   public static MetricsSource queueSource(MetricsSystem ms, String queue) {
-    MetricsSource s = ms.getSource(QueueMetrics.sourceName(queue).toString());
-    return s;
+    return ms.getSource(QueueMetrics.sourceName(queue).toString());
   }
 
-  public static MetricsSource userSource(MetricsSystem ms, String queue,
-                                         String user) {
-    MetricsSource s = ms.getSource(QueueMetrics.sourceName(queue).
+  private static MetricsSource userSource(MetricsSystem ms, String queue,
+      String user) {
+    return ms.getSource(QueueMetrics.sourceName(queue).
         append(",user=").append(user).toString());
-    return s;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org