You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/04/30 22:34:50 UTC
svn commit: r1098152 - in /hadoop/mapreduce/branches/MR-279/yarn:
yarn-common/src/main/java/org/apache/hadoop/yarn/util/
yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/
yarn-server/yarn-serv...
Author: mahadev
Date: Sat Apr 30 20:34:49 2011
New Revision: 1098152
URL: http://svn.apache.org/viewvc?rev=1098152&view=rev
Log:
MAPREDUCE-2434. Metrics for ResourceManager. Contributed by Luke Lu. - added files
Added:
hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Records.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Self.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Records.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Records.java?rev=1098152&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Records.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Records.java Sat Apr 30 20:34:49 2011
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.util;
+
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
+/**
+ * Convenient API record utils
+ */
+public class Records {
+ // The default record factory
+ private static final RecordFactory factory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ public static <T> T newRecord(Class<T> cls) {
+ return factory.newRecordInstance(cls);
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Self.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Self.java?rev=1098152&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Self.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Self.java Sat Apr 30 20:34:49 2011
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.util;
+
+/**
+ * Some utilities for introspection
+ */
+public class Self {
+ private static boolean firstTime = true;
+ private static boolean isUnitTest = false;
+ private static boolean isJUnitTest = false;
+
+ public synchronized static boolean isUnitTest() {
+ detect();
+ return isUnitTest;
+ }
+
+ public synchronized static boolean isJUnitTest() {
+ detect();
+ return isJUnitTest;
+ }
+
+ private synchronized static void detect() {
+ if (!firstTime) {
+ return;
+ }
+ firstTime = false;
+ for (StackTraceElement e : new Throwable().getStackTrace()) {
+ String className = e.getClassName();
+ if (className.startsWith("org.junit")) {
+ isUnitTest = isJUnitTest = true;
+ return;
+ }
+ if (className.startsWith("org.apache.maven.surefire")) {
+ isUnitTest = true;
+ return;
+ }
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java?rev=1098152&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java Sat Apr 30 20:34:49 2011
@@ -0,0 +1,97 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.resource;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+
+@Private
+@Evolving
+public class Resources {
+ // Java doesn't have const :(
+ private static final Resource NONE = createResource(0);
+
+ public static Resource createResource(int memory) {
+ Resource resource = Records.newRecord(Resource.class);
+ resource.setMemory(memory);
+ return resource;
+ }
+
+ public static Resource none() {
+ assert NONE.getMemory() == 0 : "NONE should be empty";
+ return NONE;
+ }
+
+ public static Resource clone(Resource res) {
+ return createResource(res.getMemory());
+ }
+
+ public static Resource addTo(Resource lhs, Resource rhs) {
+ lhs.setMemory(lhs.getMemory() + rhs.getMemory());
+ return lhs;
+ }
+
+ public static Resource add(Resource lhs, Resource rhs) {
+ return addTo(clone(lhs), rhs);
+ }
+
+ public static Resource subtractFrom(Resource lhs, Resource rhs) {
+ lhs.setMemory(lhs.getMemory() - rhs.getMemory());
+ return lhs;
+ }
+
+ public static Resource subtract(Resource lhs, Resource rhs) {
+ return subtractFrom(clone(lhs), rhs);
+ }
+
+ public static Resource negate(Resource resource) {
+ return subtract(NONE, resource);
+ }
+
+ public static Resource multiplyTo(Resource lhs, int by) {
+ lhs.setMemory(lhs.getMemory() * by);
+ return lhs;
+ }
+
+ public static Resource multiply(Resource lhs, int by) {
+ return multiplyTo(clone(lhs), by);
+ }
+
+ public static boolean equals(Resource lhs, Resource rhs) {
+ return lhs.getMemory() == rhs.getMemory();
+ }
+
+ public static boolean lessThan(Resource lhs, Resource rhs) {
+ return lhs.getMemory() < rhs.getMemory();
+ }
+
+ public static boolean lessThanOrEqual(Resource lhs, Resource rhs) {
+ return lhs.getMemory() <= rhs.getMemory();
+ }
+
+ public static boolean greaterThan(Resource lhs, Resource rhs) {
+ return lhs.getMemory() > rhs.getMemory();
+ }
+
+ public static boolean greaterThanOrEqual(Resource lhs, Resource rhs) {
+ return lhs.getMemory() >= rhs.getMemory();
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java?rev=1098152&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java Sat Apr 30 20:34:49 2011
@@ -0,0 +1,256 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import com.google.common.base.Splitter;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Self;
+import static org.apache.hadoop.yarn.server.resourcemanager.resource.Resources.*;
+
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+@InterfaceAudience.Private
+@Metrics(context="yarn")
+public class QueueMetrics {
+ @Metric("# of apps submitted") MutableCounterInt appsSubmitted;
+ @Metric("# of running apps") MutableGaugeInt appsRunning;
+ @Metric("# of pending apps") MutableGaugeInt appsPending;
+ @Metric("# of apps completed") MutableCounterInt appsCompleted;
+ @Metric("# of apps killed") MutableCounterInt appsKilled;
+ @Metric("# of apps failed") MutableCounterInt appsFailed;
+
+ @Metric("Allocated memory in GiB") MutableGaugeInt allocatedGB;
+ @Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
+ @Metric("Available memory in GiB") MutableGaugeInt availableGB;
+ @Metric("Pending memory allocation in GiB") MutableGaugeInt pendingGB;
+ @Metric("# of pending containers") MutableGaugeInt pendingContainers;
+
+ static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class);
+ static final int GB = 1024; // resource.memory is in MB
+ static final MetricsInfo RECORD_INFO = info("SchedulerMetrics",
+ "Metrics for the resource scheduler");
+ static final MetricsInfo QUEUE_INFO = info("Queue", "Metrics by queue");
+ static final MetricsInfo USER_INFO = info("User", "Metrics by user");
+ static final Splitter Q_SPLITTER =
+ Splitter.on('.').omitEmptyStrings().trimResults();
+
+ // MSXXX: until metrics system handle this automatically
+ private static QueueMetrics dummyMetrics = null;
+ private static boolean firstTime = true;
+
+ final MetricsRegistry registry;
+ final String queueName;
+ final QueueMetrics parent;
+ final MetricsSystem metricsSystem;
+ private final Map<String, QueueMetrics> users;
+
+ QueueMetrics(MetricsSystem ms, String queueName, Queue parent, boolean enableUserMetrics) {
+ registry = new MetricsRegistry(RECORD_INFO);
+ this.queueName = queueName;
+ this.parent = parent != null ? parent.getMetrics() : null;
+ this.users = enableUserMetrics ? new HashMap<String, QueueMetrics>()
+ : null;
+ metricsSystem = ms;
+ }
+
+ QueueMetrics tag(MetricsInfo info, String value) {
+ registry.tag(info, value);
+ return this;
+ }
+
+ static StringBuilder sourceName(String queueName) {
+ StringBuilder sb = new StringBuilder(RECORD_INFO.name());
+ int i = 0;
+ for (String node : Q_SPLITTER.split(queueName)) {
+ sb.append(",q").append(i++).append('=').append(node);
+ }
+ return sb;
+ }
+
+ public synchronized
+ static QueueMetrics forQueue(String queueName, Queue parent,
+ boolean enableUserMetrics) {
+ MetricsSystem ms = null;
+ if (firstTime) {
+ firstTime = false;
+ ms = DefaultMetricsSystem.instance();
+ } else if (!Self.isUnitTest()) {
+ ms = DefaultMetricsSystem.instance();
+ } else {
+ return dummyMetrics; // metrics specific tests should use the long form
+ }
+ QueueMetrics metrics = forQueue(ms, queueName, parent, enableUserMetrics);
+ if (dummyMetrics == null) {
+ dummyMetrics = metrics;
+ }
+ return metrics;
+ }
+
+ public static QueueMetrics forQueue(MetricsSystem ms, String queueName,
+ Queue parent, boolean enableUserMetrics) {
+ QueueMetrics metrics = new QueueMetrics(ms, queueName, parent,
+ enableUserMetrics).tag(QUEUE_INFO, queueName);
+ return ms == null ? metrics : ms.register(sourceName(queueName).toString(),
+ "Metrics for queue: " + queueName, metrics);
+ }
+
+ synchronized QueueMetrics getUserMetrics(String userName) {
+ if (users == null) {
+ return null;
+ }
+ QueueMetrics metrics = users.get(userName);
+ if (metrics == null) {
+ metrics = new QueueMetrics(metricsSystem, queueName, null, false);
+ users.put(userName, metrics);
+ metricsSystem.register(
+ sourceName(queueName).append(",user=").append(userName).toString(),
+ "Metrics for user '"+ userName +"' in queue '"+ queueName +"'",
+ metrics.tag(QUEUE_INFO, queueName).tag(USER_INFO, userName));
+ }
+ return metrics;
+ }
+
+ public void submitApp(String user) {
+ appsSubmitted.incr();
+ appsPending.incr();
+ QueueMetrics userMetrics = getUserMetrics(user);
+ if (userMetrics != null) {
+ userMetrics.submitApp(user);
+ }
+ if (parent != null) {
+ parent.submitApp(user);
+ }
+ }
+
+ public void incrAppsRunning(String user) {
+ appsRunning.incr();
+ appsPending.decr();
+ QueueMetrics userMetrics = getUserMetrics(user);
+ if (userMetrics != null) {
+ userMetrics.incrAppsRunning(user);
+ }
+ if (parent != null) {
+ parent.incrAppsRunning(user);
+ }
+ }
+
+ public void finishApp(Application app) {
+ ApplicationState state = app.getState();
+ switch (state) {
+ case KILLED: appsKilled.incr(); break;
+ case FAILED: appsFailed.incr(); break;
+ default: appsCompleted.incr(); break;
+ }
+ if (app.isPending()) {
+ appsPending.decr();
+ } else {
+ appsRunning.decr();
+ }
+ QueueMetrics userMetrics = getUserMetrics(app.getUser());
+ if (userMetrics != null) {
+ userMetrics.finishApp(app);
+ }
+ if (parent != null) {
+ parent.finishApp(app);
+ }
+ }
+
+ /**
+ * Set available resources. To be called by scheduler periodically as
+ * resources become available.
+ * @param mb memory in MB
+ */
+ public void setAvailableQueueMemory(int mb) {
+ availableGB.set(mb/GB);
+ }
+
+ /**
+ * Set available resources. To be called by scheduler periodically as
+ * resources become available.
+ * @param user
+ * @param mb memory in MB
+ */
+ public void setAvailableUserMemory(String user, int mb) {
+ QueueMetrics userMetrics = getUserMetrics(user);
+ if (userMetrics != null) {
+ userMetrics.setAvailableQueueMemory(mb);
+ }
+ }
+
+ /**
+ * Increment pending resource metrics
+ * @param user
+ * @param containers
+ * @param res the TOTAL delta of resources note this is different from
+ * the other APIs which use per container resource
+ */
+ public void incrPendingResources(String user, int containers, Resource res) {
+ _incrPendingResources(containers, res);
+ QueueMetrics userMetrics = getUserMetrics(user);
+ if (userMetrics != null) {
+ userMetrics.incrPendingResources(user, containers, res);
+ }
+ if (parent != null) {
+ parent.incrPendingResources(user, containers, res);
+ }
+ }
+
+ private void _incrPendingResources(int containers, Resource res) {
+ pendingContainers.incr(containers);
+ pendingGB.incr(res.getMemory()/GB);
+ }
+
+ public void decrPendingResources(String user, int containers, Resource res) {
+ _decrPendingResources(containers, res);
+ QueueMetrics userMetrics = getUserMetrics(user);
+ if (userMetrics != null) {
+ userMetrics.decrPendingResources(user, containers, res);
+ }
+ if (parent != null) {
+ parent.decrPendingResources(user, containers, res);
+ }
+ }
+
+ private void _decrPendingResources(int containers, Resource res) {
+ pendingContainers.decr(containers);
+ pendingGB.decr(res.getMemory()/GB);
+ }
+
+ public void allocateResources(String user, int containers, Resource res) {
+ allocatedContainers.incr(containers);
+ allocatedGB.incr(res.getMemory()/GB * containers);
+ _decrPendingResources(containers, multiply(res, containers));
+ QueueMetrics userMetrics = getUserMetrics(user);
+ if (userMetrics != null) {
+ userMetrics.allocateResources(user, containers, res);
+ }
+ if (parent != null) {
+ parent.allocateResources(user, containers, res);
+ }
+ }
+
+ public void releaseResources(String user, int containers, Resource res) {
+ allocatedContainers.decr(containers);
+ allocatedGB.decr(res.getMemory()/GB * containers);
+ QueueMetrics userMetrics = getUserMetrics(user);
+ if (userMetrics != null) {
+ userMetrics.releaseResources(user, containers, res);
+ }
+ if (parent != null) {
+ parent.releaseResources(user, containers, res);
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java?rev=1098152&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java Sat Apr 30 20:34:49 2011
@@ -0,0 +1,195 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import static org.apache.hadoop.test.MetricsAsserts.*;
+import static org.apache.hadoop.test.MockitoMaker.*;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class TestQueueMetrics {
+ static final int GB = 1024; // MB
+
+ MetricsSystem ms;
+
+ @Before public void setup() {
+ ms = new MetricsSystemImpl();
+ }
+
+ @Test public void testDefaultSingleQueueMetrics() {
+ String queueName = "single";
+ String user = "alice";
+
+ QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false);
+ MetricsSource queueSource= queueSource(ms, queueName);
+ Application app = mockApp(user);
+
+ metrics.submitApp(user);
+ MetricsSource userSource = userSource(ms, queueName, user);
+ checkApps(queueSource, 1, 1, 0, 0, 0, 0);
+
+ metrics.setAvailableQueueMemory(100*GB);
+ metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
+ // Available resources is set externally, as it depends on dynamic
+ // configurable cluster/queue resources
+ checkResources(queueSource, 0, 0, 100, 15, 5);
+
+ metrics.incrAppsRunning(user);
+ checkApps(queueSource, 1, 0, 1, 0, 0, 0);
+
+ metrics.allocateResources(user, 3, Resources.createResource(2*GB));
+ checkResources(queueSource, 6, 3, 100, 9, 2);
+
+ metrics.releaseResources(user, 1, Resources.createResource(2*GB));
+ checkResources(queueSource, 4, 2, 100, 9, 2);
+
+ metrics.finishApp(app);
+ checkApps(queueSource, 1, 0, 0, 1, 0, 0);
+ assertNull(userSource);
+ }
+
+ @Test public void testSingleQueueWithUserMetrics() {
+ String queueName = "single2";
+ String user = "dodo";
+
+ QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, true);
+ MetricsSource queueSource = queueSource(ms, queueName);
+ Application app = mockApp(user);
+
+ metrics.submitApp(user);
+ MetricsSource userSource = userSource(ms, queueName, user);
+
+ checkApps(queueSource, 1, 1, 0, 0, 0, 0);
+ checkApps(userSource, 1, 1, 0, 0, 0, 0);
+
+ metrics.setAvailableQueueMemory(100*GB);
+ metrics.setAvailableUserMemory(user, 10*GB);
+ metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
+ // Available resources is set externally, as it depends on dynamic
+ // configurable cluster/queue resources
+ checkResources(queueSource, 0, 0, 100, 15, 5);
+ checkResources(userSource, 0, 0, 10, 15, 5);
+
+ metrics.incrAppsRunning(user);
+ checkApps(queueSource, 1, 0, 1, 0, 0, 0);
+ checkApps(userSource, 1, 0, 1, 0, 0, 0);
+
+ metrics.allocateResources(user, 3, Resources.createResource(2*GB));
+ checkResources(queueSource, 6, 3, 100, 9, 2);
+ checkResources(userSource, 6, 3, 10, 9, 2);
+
+ metrics.releaseResources(user, 1, Resources.createResource(2*GB));
+ checkResources(queueSource, 4, 2, 100, 9, 2);
+ checkResources(userSource, 4, 2, 10, 9, 2);
+
+ metrics.finishApp(app);
+ checkApps(queueSource, 1, 0, 0, 1, 0, 0);
+ checkApps(userSource, 1, 0, 0, 1, 0, 0);
+ }
+
+ @Test public void testTwoLevelWithUserMetrics() {
+ String parentQueueName = "root";
+ String leafQueueName = "root.leaf";
+ String user = "alice";
+
+ QueueMetrics parentMetrics =
+ QueueMetrics.forQueue(ms, parentQueueName, null, true);
+ Queue parentQueue = make(stub(Queue.class).returning(parentMetrics).
+ from.getMetrics());
+ QueueMetrics metrics =
+ QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true);
+ MetricsSource parentQueueSource = queueSource(ms, parentQueueName);
+ MetricsSource queueSource = queueSource(ms, leafQueueName);
+ Application app = mockApp(user);
+
+ metrics.submitApp(user);
+ MetricsSource userSource = userSource(ms, leafQueueName, user);
+ MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
+
+ checkApps(queueSource, 1, 1, 0, 0, 0, 0);
+ checkApps(parentQueueSource, 1, 1, 0, 0, 0, 0);
+ checkApps(userSource, 1, 1, 0, 0, 0, 0);
+ checkApps(parentUserSource, 1, 1, 0, 0, 0, 0);
+
+ parentMetrics.setAvailableQueueMemory(100*GB);
+ metrics.setAvailableQueueMemory(100*GB);
+ parentMetrics.setAvailableUserMemory(user, 10*GB);
+ metrics.setAvailableUserMemory(user, 10*GB);
+ metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
+ checkResources(queueSource, 0, 0, 100, 15, 5);
+ checkResources(parentQueueSource, 0, 0, 100, 15, 5);
+ checkResources(userSource, 0, 0, 10, 15, 5);
+ checkResources(parentUserSource, 0, 0, 10, 15, 5);
+
+ metrics.incrAppsRunning(user);
+ checkApps(queueSource, 1, 0, 1, 0, 0, 0);
+ checkApps(userSource, 1, 0, 1, 0, 0, 0);
+
+ metrics.allocateResources(user, 3, Resources.createResource(2*GB));
+ // Available resources is set externally, as it depends on dynamic
+ // configurable cluster/queue resources
+ checkResources(queueSource, 6, 3, 100, 9, 2);
+ checkResources(parentQueueSource, 6, 3, 100, 9, 2);
+ checkResources(userSource, 6, 3, 10, 9, 2);
+ checkResources(parentUserSource, 6, 3, 10, 9, 2);
+
+ metrics.releaseResources(user, 1, Resources.createResource(2*GB));
+ checkResources(queueSource, 4, 2, 100, 9, 2);
+ checkResources(parentQueueSource, 4, 2, 100, 9, 2);
+ checkResources(userSource, 4, 2, 10, 9, 2);
+ checkResources(parentUserSource, 4, 2, 10, 9, 2);
+
+ metrics.finishApp(app);
+ checkApps(queueSource, 1, 0, 0, 1, 0, 0);
+ checkApps(parentQueueSource, 1, 0, 0, 1, 0, 0);
+ checkApps(userSource, 1, 0, 0, 1, 0, 0);
+ checkApps(parentUserSource, 1, 0, 0, 1, 0, 0);
+ }
+
+ public static void checkApps(MetricsSource source, int submitted, int pending,
+ int running, int completed, int failed, int killed) {
+ MetricsRecordBuilder rb = getMetrics(source);
+ assertCounter("AppsSubmitted", submitted, rb);
+ assertGauge("AppsPending", pending, rb);
+ assertGauge("AppsRunning", running, rb);
+ assertCounter("AppsCompleted", completed, rb);
+ assertCounter("AppsFailed", failed, rb);
+ assertCounter("AppsKilled", killed, rb);
+ }
+
+ public static void checkResources(MetricsSource source, int allocGB,
+ int allocCtnrs, int availGB, int pendingGB, int pendingCtnrs) {
+ MetricsRecordBuilder rb = getMetrics(source);
+ assertGauge("AllocatedGB", allocGB, rb);
+ assertGauge("AllocatedContainers", allocCtnrs, rb);
+ assertGauge("AvailableGB", availGB, rb);
+ assertGauge("PendingGB", pendingGB, rb);
+ assertGauge("PendingContainers", pendingCtnrs, rb);
+ }
+
+ private static Application mockApp(String user) {
+ Application app = mock(Application.class);
+ when(app.getState()).thenReturn(ApplicationState.RUNNING);
+ when(app.getUser()).thenReturn(user);
+ return app;
+ }
+
+ public static MetricsSource queueSource(MetricsSystem ms, String queue) {
+ MetricsSource s = ms.getSource(QueueMetrics.sourceName(queue).toString());
+ return s;
+ }
+
+ public static MetricsSource userSource(MetricsSystem ms, String queue,
+ String user) {
+ MetricsSource s = ms.getSource(QueueMetrics.sourceName(queue).
+ append(",user=").append(user).toString());
+ return s;
+ }
+}