You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2019/10/15 22:29:08 UTC
[hadoop] branch branch-3.1 updated: YARN-8842. Expose metrics for
custom resource types in QueueMetrics. (Contributed by Szilard Nemeth)
This is an automated email from the ASF dual-hosted git repository.
epayne pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new a70c6e9 YARN-8842. Expose metrics for custom resource types in QueueMetrics. (Contributed by Szilard Nemeth)
a70c6e9 is described below
commit a70c6e96652a92054adbbc9514c63384c0f015d2
Author: Haibo Chen <ha...@apache.org>
AuthorDate: Tue Oct 16 14:12:02 2018 -0700
YARN-8842. Expose metrics for custom resource types in QueueMetrics. (Contributed by Szilard Nemeth)
(cherry picked from commit 84e22a6af46db2859d7d2caf192861cae9b6a1a8)
---
.../resourcetypes/ResourceTypesTestHelper.java | 22 +
.../resourcemanager/scheduler/QueueMetrics.java | 129 ++++-
.../scheduler/QueueMetricsForCustomResources.java | 158 +++++
.../scheduler/capacity/CapacityScheduler.java | 5 +-
.../resourcemanager/scheduler/QueueInfo.java | 90 +++
.../scheduler/QueueMetricsTestData.java | 105 ++++
.../scheduler/ResourceMetricsChecker.java | 88 +--
.../scheduler/TestQueueMetrics.java | 250 ++++----
.../TestQueueMetricsForCustomResources.java | 645 +++++++++++++++++++++
9 files changed, 1324 insertions(+), 168 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java
index 98a8a00..3c3c2cc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java
@@ -16,6 +16,7 @@
package org.apache.hadoop.yarn.resourcetypes;
+import com.google.common.collect.Maps;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -24,6 +25,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/**
* Contains helper methods to create Resource and ResourceInformation objects.
@@ -90,4 +92,24 @@ public final class ResourceTypesTestHelper {
return new ResourceValueAndUnit(value, matcher.group(2));
}
+ public static Map<String, Long> extractCustomResources(Resource res) {
+ Map<String, Long> customResources = Maps.newHashMap();
+ for (int i = 0; i < res.getResources().length; i++) {
+ ResourceInformation ri = res.getResourceInformation(i);
+ if (!ri.getName().equals(ResourceInformation.MEMORY_URI)
+ && !ri.getName().equals(ResourceInformation.VCORES_URI)) {
+ customResources.put(ri.getName(), ri.getValue());
+ }
+ }
+ return customResources;
+ }
+
+ public static Map<String, String> extractCustomResourcesAsStrings(
+ Resource res) {
+ Map<String, Long> resValues = extractCustomResources(res);
+ return resValues.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey, e -> String.valueOf(e.getValue())));
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
index 4b70502..bb8d1d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
@@ -46,7 +46,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .QueueMetricsForCustomResources.QueueMetricsCustomResource;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -119,6 +121,7 @@ public class QueueMetrics implements MetricsSource {
protected final MetricsSystem metricsSystem;
protected final Map<String, QueueMetrics> users;
protected final Configuration conf;
+ private QueueMetricsForCustomResources queueMetricsForCustomResources;
protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) {
@@ -130,6 +133,11 @@ public class QueueMetrics implements MetricsSource {
metricsSystem = ms;
this.conf = conf;
runningTime = buildBuckets(conf);
+
+ if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
+ this.queueMetricsForCustomResources =
+ new QueueMetricsForCustomResources();
+ }
}
protected QueueMetrics tag(MetricsInfo info, String value) {
@@ -355,9 +363,12 @@ public class QueueMetrics implements MetricsSource {
* @param limit resource limit
*/
public void setAvailableResourcesToQueue(String partition, Resource limit) {
- if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+ if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
availableMB.set(limit.getMemorySize());
availableVCores.set(limit.getVirtualCores());
+ if (queueMetricsForCustomResources != null) {
+ queueMetricsForCustomResources.setAvailable(limit);
+ }
}
}
@@ -397,7 +408,7 @@ public class QueueMetrics implements MetricsSource {
*/
public void incrPendingResources(String partition, String user,
int containers, Resource res) {
- if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+ if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
_incrPendingResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
@@ -413,12 +424,15 @@ public class QueueMetrics implements MetricsSource {
pendingContainers.incr(containers);
pendingMB.incr(res.getMemorySize() * containers);
pendingVCores.incr(res.getVirtualCores() * containers);
+ if (queueMetricsForCustomResources != null) {
+ queueMetricsForCustomResources.increasePending(res, containers);
+ }
}
public void decrPendingResources(String partition, String user,
int containers, Resource res) {
- if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+ if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
_decrPendingResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
@@ -434,6 +448,9 @@ public class QueueMetrics implements MetricsSource {
pendingContainers.decr(containers);
pendingMB.decr(res.getMemorySize() * containers);
pendingVCores.decr(res.getVirtualCores() * containers);
+ if (queueMetricsForCustomResources != null) {
+ queueMetricsForCustomResources.decreasePending(res, containers);
+ }
}
public void incrNodeTypeAggregations(String user, NodeType type) {
@@ -457,12 +474,16 @@ public class QueueMetrics implements MetricsSource {
public void allocateResources(String partition, String user,
int containers, Resource res, boolean decrPending) {
- if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+ if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
allocatedContainers.incr(containers);
aggregateContainersAllocated.incr(containers);
allocatedMB.incr(res.getMemorySize() * containers);
allocatedVCores.incr(res.getVirtualCores() * containers);
+ if (queueMetricsForCustomResources != null) {
+ queueMetricsForCustomResources.increaseAllocated(res, containers);
+ }
+
if (decrPending) {
_decrPendingResources(containers, res);
}
@@ -484,12 +505,18 @@ public class QueueMetrics implements MetricsSource {
* @param res
*/
public void allocateResources(String partition, String user, Resource res) {
- if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+ if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
allocatedMB.incr(res.getMemorySize());
allocatedVCores.incr(res.getVirtualCores());
+ if (queueMetricsForCustomResources != null) {
+ queueMetricsForCustomResources.increaseAllocated(res);
+ }
pendingMB.decr(res.getMemorySize());
pendingVCores.decr(res.getVirtualCores());
+ if (queueMetricsForCustomResources != null) {
+ queueMetricsForCustomResources.decreasePending(res);
+ }
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
@@ -503,11 +530,15 @@ public class QueueMetrics implements MetricsSource {
public void releaseResources(String partition,
String user, int containers, Resource res) {
- if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+ if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
allocatedContainers.decr(containers);
aggregateContainersReleased.incr(containers);
allocatedMB.decr(res.getMemorySize() * containers);
allocatedVCores.decr(res.getVirtualCores() * containers);
+ if (queueMetricsForCustomResources != null) {
+ queueMetricsForCustomResources.decreaseAllocated(res, containers);
+ }
+
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.releaseResources(partition, user, containers, res);
@@ -524,9 +555,13 @@ public class QueueMetrics implements MetricsSource {
* @param user
* @param res
*/
- public void releaseResources(String user, Resource res) {
+ private void releaseResources(String user, Resource res) {
allocatedMB.decr(res.getMemorySize());
allocatedVCores.decr(res.getVirtualCores());
+ if (queueMetricsForCustomResources != null) {
+ queueMetricsForCustomResources.decreaseAllocated(res);
+ }
+
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.releaseResources(user, res);
@@ -557,6 +592,17 @@ public class QueueMetrics implements MetricsSource {
}
}
+ public void updatePreemptedSecondsForCustomResources(Resource res,
+ long seconds) {
+ if (queueMetricsForCustomResources != null) {
+ queueMetricsForCustomResources
+ .increaseAggregatedPreemptedSeconds(res, seconds);
+ }
+ if (parent != null) {
+ parent.updatePreemptedSecondsForCustomResources(res, seconds);
+ }
+ }
+
public void updatePreemptedResources(Resource res) {
aggregateMemoryMBPreempted.incr(res.getMemorySize());
aggregateVcoresPreempted.incr(res.getVirtualCores());
@@ -566,7 +612,7 @@ public class QueueMetrics implements MetricsSource {
}
public void reserveResource(String partition, String user, Resource res) {
- if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+ if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
reserveResource(user, res);
}
}
@@ -575,6 +621,9 @@ public class QueueMetrics implements MetricsSource {
reservedContainers.incr();
reservedMB.incr(res.getMemorySize());
reservedVCores.incr(res.getVirtualCores());
+ if (queueMetricsForCustomResources != null) {
+ queueMetricsForCustomResources.increaseReserved(res);
+ }
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.reserveResource(user, res);
@@ -584,10 +633,13 @@ public class QueueMetrics implements MetricsSource {
}
}
- public void unreserveResource(String user, Resource res) {
+ private void unreserveResource(String user, Resource res) {
reservedContainers.decr();
reservedMB.decr(res.getMemorySize());
reservedVCores.decr(res.getVirtualCores());
+ if (queueMetricsForCustomResources != null) {
+ queueMetricsForCustomResources.decreaseReserved(res);
+ }
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.unreserveResource(user, res);
@@ -598,7 +650,7 @@ public class QueueMetrics implements MetricsSource {
}
public void unreserveResource(String partition, String user, Resource res) {
- if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+ if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
unreserveResource(user, res);
}
}
@@ -660,10 +712,59 @@ public class QueueMetrics implements MetricsSource {
public int getAppsFailed() {
return appsFailed.value();
}
-
+
public Resource getAllocatedResources() {
- return BuilderUtils.newResource(allocatedMB.value(),
- (int) allocatedVCores.value());
+ if (queueMetricsForCustomResources != null) {
+ return Resource.newInstance(allocatedMB.value(), allocatedVCores.value(),
+ queueMetricsForCustomResources.getAllocatedValues());
+ }
+ return Resource.newInstance(allocatedMB.value(),
+ allocatedVCores.value());
+ }
+
+ public Resource getAvailableResources() {
+ if (queueMetricsForCustomResources != null) {
+ return Resource.newInstance(availableMB.value(), availableVCores.value(),
+ queueMetricsForCustomResources.getAvailableValues());
+ }
+ return Resource.newInstance(availableMB.value(), availableVCores.value());
+ }
+
+ public Resource getPendingResources() {
+ if (queueMetricsForCustomResources != null) {
+ return Resource.newInstance(pendingMB.value(), pendingVCores.value(),
+ queueMetricsForCustomResources.getPendingValues());
+ }
+ return Resource.newInstance(pendingMB.value(), pendingVCores.value());
+ }
+
+ public Resource getReservedResources() {
+ if (queueMetricsForCustomResources != null) {
+ return Resource.newInstance(reservedMB.value(), reservedVCores.value(),
+ queueMetricsForCustomResources.getReservedValues());
+ }
+ return Resource.newInstance(reservedMB.value(), reservedVCores.value());
+ }
+
+ /**
+ * Handle this specially as this has a long value and it could be
+ * truncated when casted into an int parameter of
+ * Resource.newInstance (vCores).
+ * @return QueueMetricsCustomResource
+ */
+ @VisibleForTesting
+ public QueueMetricsCustomResource getAggregatedPreemptedSecondsResources() {
+ return queueMetricsForCustomResources.getAggregatePreemptedSeconds();
+ }
+
+ @VisibleForTesting
+ public MutableCounterLong getAggregateMemoryMBSecondsPreempted() {
+ return aggregateMemoryMBSecondsPreempted;
+ }
+
+ @VisibleForTesting
+ public MutableCounterLong getAggregateVcoreSecondsPreempted() {
+ return aggregateVcoreSecondsPreempted;
}
public long getAllocatedMB() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java
new file mode 100644
index 0000000..8029584
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * This class is a main entry-point for any kind of metrics for
+ * custom resources.
+ * It provides increase and decrease methods for all types of metrics.
+ */
+public class QueueMetricsForCustomResources {
+ /**
+ * Class that holds metrics values for custom resources in a map keyed with
+ * the name of the custom resource.
+ * There are different kinds of values like allocated, available and others.
+ */
+ public static class QueueMetricsCustomResource {
+ private final Map<String, Long> values = Maps.newHashMap();
+
+ protected void increase(Resource res) {
+ update(res, Long::sum);
+ }
+
+ void increaseWithMultiplier(Resource res, long multiplier) {
+ update(res, (v1, v2) -> v1 + v2 * multiplier);
+ }
+
+ protected void decrease(Resource res) {
+ update(res, (v1, v2) -> v1 - v2);
+ }
+
+ void decreaseWithMultiplier(Resource res, int containers) {
+ update(res, (v1, v2) -> v1 - v2 * containers);
+ }
+
+ protected void set(Resource res) {
+ update(res, (v1, v2) -> v2);
+ }
+
+ private void update(Resource res, BiFunction<Long, Long, Long> operation) {
+ if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
+ ResourceInformation[] resources = res.getResources();
+
+ for (int i = 2; i < resources.length; i++) {
+ ResourceInformation resource = resources[i];
+
+ // Map.merge only applies operation if there is
+ // a value for the key in the map
+ if (!values.containsKey(resource.getName())) {
+ values.put(resource.getName(), 0L);
+ }
+ values.merge(resource.getName(),
+ resource.getValue(), operation);
+ }
+ }
+ }
+
+ public Map<String, Long> getValues() {
+ return values;
+ }
+ }
+ private final QueueMetricsCustomResource aggregatePreemptedSeconds =
+ new QueueMetricsCustomResource();
+ private final QueueMetricsCustomResource allocated =
+ new QueueMetricsCustomResource();
+ private final QueueMetricsCustomResource available =
+ new QueueMetricsCustomResource();
+ private final QueueMetricsCustomResource pending =
+ new QueueMetricsCustomResource();
+
+ private final QueueMetricsCustomResource reserved =
+ new QueueMetricsCustomResource();
+
+ public void increaseReserved(Resource res) {
+ reserved.increase(res);
+ }
+
+ public void decreaseReserved(Resource res) {
+ reserved.decrease(res);
+ }
+
+ public void setAvailable(Resource res) {
+ available.set(res);
+ }
+
+ public void increasePending(Resource res, int containers) {
+ pending.increaseWithMultiplier(res, containers);
+ }
+
+ public void decreasePending(Resource res) {
+ pending.decrease(res);
+ }
+
+ public void decreasePending(Resource res, int containers) {
+ pending.decreaseWithMultiplier(res, containers);
+ }
+
+ public void increaseAllocated(Resource res) {
+ allocated.increase(res);
+ }
+
+ public void increaseAllocated(Resource res, int containers) {
+ allocated.increaseWithMultiplier(res, containers);
+ }
+
+ public void decreaseAllocated(Resource res) {
+ allocated.decrease(res);
+ }
+
+ public void decreaseAllocated(Resource res, int containers) {
+ allocated.decreaseWithMultiplier(res, containers);
+ }
+
+ public void increaseAggregatedPreemptedSeconds(Resource res, long seconds) {
+ aggregatePreemptedSeconds.increaseWithMultiplier(res, seconds);
+ }
+
+ Map<String, Long> getAllocatedValues() {
+ return allocated.getValues();
+ }
+
+ Map<String, Long> getAvailableValues() {
+ return available.getValues();
+ }
+
+ Map<String, Long> getPendingValues() {
+ return pending.getValues();
+ }
+
+ Map<String, Long> getReservedValues() {
+ return reserved.getValues();
+ }
+
+ QueueMetricsCustomResource getAggregatePreemptedSeconds() {
+ return aggregatePreemptedSeconds;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index f5da987..860b1d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -2023,7 +2023,8 @@ public class CapacityScheduler extends
private void updateQueuePreemptionMetrics(
CSQueue queue, RMContainer rmc) {
QueueMetrics qMetrics = queue.getMetrics();
- long usedMillis = rmc.getFinishTime() - rmc.getCreationTime();
+ final long usedMillis = rmc.getFinishTime() - rmc.getCreationTime();
+ final long usedSeconds = usedMillis / DateUtils.MILLIS_PER_SECOND;
Resource containerResource = rmc.getAllocatedResource();
qMetrics.preemptContainer();
long mbSeconds = (containerResource.getMemorySize() * usedMillis)
@@ -2032,6 +2033,8 @@ public class CapacityScheduler extends
/ DateUtils.MILLIS_PER_SECOND;
qMetrics.updatePreemptedMemoryMBSeconds(mbSeconds);
qMetrics.updatePreemptedVcoreSeconds(vcSeconds);
+ qMetrics.updatePreemptedSecondsForCustomResources(containerResource,
+ usedSeconds);
qMetrics.updatePreemptedResources(containerResource);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueInfo.java
new file mode 100644
index 0000000..0a0f893
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueInfo.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+
+import java.util.function.Consumer;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .TestQueueMetrics.userSource;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * This class holds queue and user metrics for a particular queue,
+ * used for testing metrics.
+ * Reference for the parent queue is also stored for every queue,
+ * except if the queue is root.
+ */
+public final class QueueInfo {
+ private final QueueInfo parentQueueInfo;
+ private final Queue queue;
+ final QueueMetrics queueMetrics;
+ final MetricsSource queueSource;
+ final MetricsSource userSource;
+
+ public QueueInfo(QueueInfo parent, String queueName, MetricsSystem ms,
+ Configuration conf, String user) {
+ Queue parentQueue = parent == null ? null : parent.queue;
+ parentQueueInfo = parent;
+ queueMetrics =
+ QueueMetrics.forQueue(ms, queueName, parentQueue, true, conf);
+ queue = mock(Queue.class);
+ when(queue.getMetrics()).thenReturn(queueMetrics);
+ queueSource = ms.getSource(QueueMetrics.sourceName(queueName).toString());
+
+ // need to call getUserMetrics so that a non-null userSource is returned
+ // with the call to userSource(..)
+ queueMetrics.getUserMetrics(user);
+ userSource = userSource(ms, queueName, user);
+ }
+
+ public QueueInfo getRoot() {
+ QueueInfo root = this;
+ while (root.parentQueueInfo != null) {
+ root = root.parentQueueInfo;
+ }
+ return root;
+ }
+
+ public void checkAllQueueSources(Consumer<MetricsSource> consumer) {
+ checkQueueSourcesRecursive(this, consumer);
+ }
+
+ private void checkQueueSourcesRecursive(QueueInfo queueInfo,
+ Consumer<MetricsSource> consumer) {
+ consumer.accept(queueInfo.queueSource);
+ if (queueInfo.parentQueueInfo != null) {
+ checkQueueSourcesRecursive(queueInfo.parentQueueInfo, consumer);
+ }
+ }
+
+ public void checkAllQueueMetrics(Consumer<QueueMetrics> consumer) {
+ checkAllQueueMetricsRecursive(this, consumer);
+ }
+
+ private void checkAllQueueMetricsRecursive(QueueInfo queueInfo, Consumer
+ <QueueMetrics> consumer) {
+ consumer.accept(queueInfo.queueMetrics);
+ if (queueInfo.parentQueueInfo != null) {
+ checkAllQueueMetricsRecursive(queueInfo.parentQueueInfo, consumer);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsTestData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsTestData.java
new file mode 100644
index 0000000..56df7d3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsTestData.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper
+ .extractCustomResources;
+
+/**
+ * This class is to test standard and custom resource metrics for all types.
+ * Metrics types can be one of: allocated, pending, reserved
+ * and other resources.
+ */
+public final class QueueMetricsTestData {
+ public static final class Builder {
+ private int containers;
+ private Resource resource;
+ private Resource resourceToDecrease;
+ private Map<String, Long> customResourceValues;
+ private int containersToDecrease;
+ private String user;
+ private String partition;
+ private QueueInfo queueInfo;
+
+ private Builder() {
+ }
+
+ public static Builder create() {
+ return new Builder();
+ }
+
+ public Builder withContainers(int containers) {
+ this.containers = containers;
+ return this;
+ }
+
+ public Builder withResourceToDecrease(Resource res, int containers) {
+ this.resourceToDecrease = res;
+ this.containersToDecrease = containers;
+ return this;
+ }
+
+ public Builder withResources(Resource res) {
+ this.resource = res;
+ return this;
+ }
+
+ public Builder withUser(String user) {
+ this.user = user;
+ return this;
+ }
+
+ public Builder withPartition(String partition) {
+ this.partition = partition;
+ return this;
+ }
+
+ public Builder withLeafQueue(QueueInfo qInfo) {
+ this.queueInfo = qInfo;
+ return this;
+ }
+
+ public QueueMetricsTestData build() {
+ this.customResourceValues = extractCustomResources(resource);
+ return new QueueMetricsTestData(this);
+ }
+ }
+
+ final Map<String, Long> customResourceValues;
+ final int containers;
+ final Resource resourceToDecrease;
+ final int containersToDecrease;
+ final Resource resource;
+ final String partition;
+ final QueueInfo leafQueue;
+ final String user;
+
+ private QueueMetricsTestData(Builder builder) {
+ this.customResourceValues = builder.customResourceValues;
+ this.containers = builder.containers;
+ this.resourceToDecrease = builder.resourceToDecrease;
+ this.containersToDecrease = builder.containersToDecrease;
+ this.resource = builder.resource;
+ this.partition = builder.partition;
+ this.leafQueue = builder.queueInfo;
+ this.user = builder.user;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java
index cd617d7..05341aab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java
@@ -27,34 +27,31 @@ import java.util.Map;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
- .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
- .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
- .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
- .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
- .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
- .ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
- .ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
- .ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricType.COUNTER_LONG;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricType.GAUGE_INT;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricType.GAUGE_LONG;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
- .ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
- .ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
final class ResourceMetricsChecker {
private final static Logger LOG =
LoggerFactory.getLogger(ResourceMetricsChecker.class);
+ enum ResourceMetricType {
+ GAUGE_INT, GAUGE_LONG, COUNTER_INT, COUNTER_LONG
+ }
+
private static final ResourceMetricsChecker INITIAL_CHECKER =
new ResourceMetricsChecker()
.gaugeLong(ALLOCATED_MB, 0)
@@ -72,29 +69,41 @@ final class ResourceMetricsChecker {
.gaugeInt(RESERVED_CONTAINERS, 0);
enum ResourceMetricsKey {
- ALLOCATED_MB("AllocatedMB"),
- ALLOCATED_V_CORES("AllocatedVCores"),
- ALLOCATED_CONTAINERS("AllocatedContainers"),
- AGGREGATE_CONTAINERS_ALLOCATED("AggregateContainersAllocated"),
- AGGREGATE_CONTAINERS_RELEASED("AggregateContainersReleased"),
- AVAILABLE_MB("AvailableMB"),
- AVAILABLE_V_CORES("AvailableVCores"),
- PENDING_MB("PendingMB"),
- PENDING_V_CORES("PendingVCores"),
- PENDING_CONTAINERS("PendingContainers"),
- RESERVED_MB("ReservedMB"),
- RESERVED_V_CORES("ReservedVCores"),
- RESERVED_CONTAINERS("ReservedContainers");
+ ALLOCATED_MB("AllocatedMB", GAUGE_LONG),
+ ALLOCATED_V_CORES("AllocatedVCores", GAUGE_INT),
+ ALLOCATED_CONTAINERS("AllocatedContainers", GAUGE_INT),
+ AGGREGATE_CONTAINERS_ALLOCATED("AggregateContainersAllocated",
+ COUNTER_LONG),
+ AGGREGATE_CONTAINERS_RELEASED("AggregateContainersReleased",
+ COUNTER_LONG),
+ AVAILABLE_MB("AvailableMB", GAUGE_LONG),
+ AVAILABLE_V_CORES("AvailableVCores", GAUGE_INT),
+ PENDING_MB("PendingMB", GAUGE_LONG),
+ PENDING_V_CORES("PendingVCores", GAUGE_INT),
+ PENDING_CONTAINERS("PendingContainers", GAUGE_INT),
+ RESERVED_MB("ReservedMB", GAUGE_LONG),
+ RESERVED_V_CORES("ReservedVCores", GAUGE_INT),
+ RESERVED_CONTAINERS("ReservedContainers", GAUGE_INT),
+ AGGREGATE_VCORE_SECONDS_PREEMPTED(
+ "AggregateVcoreSecondsPreempted", COUNTER_LONG),
+ AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED(
+ "AggregateMemoryMBSecondsPreempted", COUNTER_LONG);
private String value;
+ private ResourceMetricType type;
- ResourceMetricsKey(String value) {
+ ResourceMetricsKey(String value, ResourceMetricType type) {
this.value = value;
+ this.type = type;
}
public String getValue() {
return value;
}
+
+ public ResourceMetricType getType() {
+ return type;
+ }
}
private final Map<ResourceMetricsKey, Long> gaugesLong;
@@ -123,20 +132,31 @@ final class ResourceMetricsChecker {
}
ResourceMetricsChecker gaugeLong(ResourceMetricsKey key, long value) {
+ ensureTypeIsCorrect(key, GAUGE_LONG);
gaugesLong.put(key, value);
return this;
}
ResourceMetricsChecker gaugeInt(ResourceMetricsKey key, int value) {
+ ensureTypeIsCorrect(key, GAUGE_INT);
gaugesInt.put(key, value);
return this;
}
ResourceMetricsChecker counter(ResourceMetricsKey key, long value) {
+ ensureTypeIsCorrect(key, COUNTER_LONG);
counters.put(key, value);
return this;
}
+ private void ensureTypeIsCorrect(ResourceMetricsKey
+ key, ResourceMetricType actualType) {
+ if (key.type != actualType) {
+ throw new IllegalStateException("Metrics type should be " + key.type
+ + " instead of " + actualType + " for metrics: " + key.value);
+ }
+ }
+
ResourceMetricsChecker checkAgainst(MetricsSource source) {
if (source == null) {
throw new IllegalStateException("MetricsSource should not be null!");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
index c971d65..2066f60 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
@@ -18,15 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
-import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
-import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
- .AppMetricsChecker.AppMetricsKey.*;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.*;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
@@ -46,8 +37,40 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_COMPLETED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_FAILED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_PENDING;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_RUNNING;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_SUBMITTED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class TestQueueMetrics {
+ private static Queue createMockQueue(QueueMetrics metrics) {
+ Queue queue = mock(Queue.class);
+ when(queue.getMetrics()).thenReturn(metrics);
+ return queue;
+ }
+
private static final int GB = 1024; // MB
+ private static final String USER = "alice";
+ private static final String USER_2 = "dodo";
private static final Configuration conf = new Configuration();
private MetricsSystem ms;
@@ -60,19 +83,18 @@ public class TestQueueMetrics {
@Test
public void testDefaultSingleQueueMetrics() {
String queueName = "single";
- String user = "alice";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false,
conf);
MetricsSource queueSource= queueSource(ms, queueName);
- AppSchedulingInfo app = mockApp(user);
+ AppSchedulingInfo app = mockApp(USER);
- metrics.submitApp(user);
- MetricsSource userSource = userSource(ms, queueName, user);
+ metrics.submitApp(USER);
+ MetricsSource userSource = userSource(ms, queueName, USER);
AppMetricsChecker appMetricsChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(queueSource, true);
- metrics.submitAppAttempt(user);
+ metrics.submitAppAttempt(USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
@@ -80,7 +102,7 @@ public class TestQueueMetrics {
metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
Resources.createResource(100*GB, 100));
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
- user, 5, Resources.createResource(3*GB, 3));
+ USER, 5, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
ResourceMetricsChecker rmChecker = ResourceMetricsChecker.create()
@@ -91,14 +113,14 @@ public class TestQueueMetrics {
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(queueSource);
- metrics.runAppAttempt(app.getApplicationId(), user);
+ metrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true);
metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
- user, 3, Resources.createResource(2*GB, 2), true);
+ USER, 3, Resources.createResource(2*GB, 2), true);
rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
.gaugeInt(ALLOCATED_V_CORES, 6)
@@ -110,7 +132,7 @@ public class TestQueueMetrics {
.checkAgainst(queueSource);
metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
- user, 1, Resources.createResource(2*GB, 2));
+ USER, 1, Resources.createResource(2*GB, 2));
rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
@@ -119,13 +141,13 @@ public class TestQueueMetrics {
.checkAgainst(queueSource);
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
- user, 0, Resources.createResource(2 * GB, 2));
+ USER, 0, Resources.createResource(2 * GB, 2));
//nothing should change in values
rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
.checkAgainst(queueSource);
metrics.decrPendingResources(RMNodeLabelsManager.NO_LABEL,
- user, 0, Resources.createResource(2 * GB, 2));
+ USER, 0, Resources.createResource(2 * GB, 2));
//nothing should change in values
ResourceMetricsChecker.createFromChecker(rmChecker)
.checkAgainst(queueSource);
@@ -136,7 +158,7 @@ public class TestQueueMetrics {
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
- metrics.finishApp(user, RMAppState.FINISHED);
+ metrics.finishApp(USER, RMAppState.FINISHED);
AppMetricsChecker.createFromChecker(appMetricsChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(queueSource, true);
@@ -146,24 +168,23 @@ public class TestQueueMetrics {
@Test
public void testQueueAppMetricsForMultipleFailures() {
String queueName = "single";
- String user = "alice";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false,
new Configuration());
MetricsSource queueSource = queueSource(ms, queueName);
- AppSchedulingInfo app = mockApp(user);
+ AppSchedulingInfo app = mockApp(USER);
- metrics.submitApp(user);
- MetricsSource userSource = userSource(ms, queueName, user);
+ metrics.submitApp(USER);
+ MetricsSource userSource = userSource(ms, queueName, USER);
AppMetricsChecker appMetricsChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(queueSource, true);
- metrics.submitAppAttempt(user);
+ metrics.submitAppAttempt(USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
- metrics.runAppAttempt(app.getApplicationId(), user);
+ metrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
@@ -177,12 +198,12 @@ public class TestQueueMetrics {
// As the application has failed, framework retries the same application
// based on configuration
- metrics.submitAppAttempt(user);
+ metrics.submitAppAttempt(USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
- metrics.runAppAttempt(app.getApplicationId(), user);
+ metrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
@@ -197,12 +218,12 @@ public class TestQueueMetrics {
// As the application has failed, framework retries the same application
// based on configuration
- metrics.submitAppAttempt(user);
+ metrics.submitAppAttempt(USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
- metrics.runAppAttempt(app.getApplicationId(), user);
+ metrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
@@ -215,7 +236,7 @@ public class TestQueueMetrics {
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
- metrics.finishApp(user, RMAppState.FAILED);
+ metrics.finishApp(USER, RMAppState.FAILED);
AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_RUNNING, 0)
.counter(APPS_FAILED, 1)
@@ -227,15 +248,14 @@ public class TestQueueMetrics {
@Test
public void testSingleQueueWithUserMetrics() {
String queueName = "single2";
- String user = "dodo";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, true,
conf);
MetricsSource queueSource = queueSource(ms, queueName);
- AppSchedulingInfo app = mockApp(user);
+ AppSchedulingInfo app = mockApp(USER_2);
- metrics.submitApp(user);
- MetricsSource userSource = userSource(ms, queueName, user);
+ metrics.submitApp(USER_2);
+ MetricsSource userSource = userSource(ms, queueName, USER_2);
AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
@@ -244,7 +264,7 @@ public class TestQueueMetrics {
.counter(APPS_SUBMITTED, 1)
.checkAgainst(userSource, true);
- metrics.submitAppAttempt(user);
+ metrics.submitAppAttempt(USER_2);
appMetricsQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 1)
@@ -257,9 +277,9 @@ public class TestQueueMetrics {
metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
Resources.createResource(100*GB, 100));
metrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL,
- user, Resources.createResource(10*GB, 10));
+ USER_2, Resources.createResource(10*GB, 10));
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
- user, 5, Resources.createResource(3*GB, 3));
+ USER_2, 5, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
@@ -280,7 +300,7 @@ public class TestQueueMetrics {
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(userSource);
- metrics.runAppAttempt(app.getApplicationId(), user);
+ metrics.runAppAttempt(app.getApplicationId(), USER_2);
appMetricsQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 0)
@@ -293,7 +313,7 @@ public class TestQueueMetrics {
.checkAgainst(userSource, true);
metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
- user, 3, Resources.createResource(2*GB, 2), true);
+ USER_2, 3, Resources.createResource(2*GB, 2), true);
resMetricsQueueSourceChecker =
ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
@@ -316,7 +336,7 @@ public class TestQueueMetrics {
.checkAgainst(userSource);
metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
- user, 1, Resources.createResource(2*GB, 2));
+ USER_2, 1, Resources.createResource(2*GB, 2));
ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
@@ -340,7 +360,7 @@ public class TestQueueMetrics {
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(userSource, true);
- metrics.finishApp(user, RMAppState.FINISHED);
+ metrics.finishApp(USER_2, RMAppState.FINISHED);
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(queueSource, true);
@@ -353,7 +373,6 @@ public class TestQueueMetrics {
public void testNodeTypeMetrics() {
String parentQueueName = "root";
String leafQueueName = "root.leaf";
- String user = "alice";
QueueMetrics parentMetrics =
QueueMetrics.forQueue(ms, parentQueueName, null, true, conf);
@@ -365,29 +384,29 @@ public class TestQueueMetrics {
MetricsSource queueSource = queueSource(ms, leafQueueName);
//AppSchedulingInfo app = mockApp(user);
- metrics.submitApp(user);
- MetricsSource userSource = userSource(ms, leafQueueName, user);
- MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
+ metrics.submitApp(USER);
+ MetricsSource userSource = userSource(ms, leafQueueName, USER);
+ MetricsSource parentUserSource = userSource(ms, parentQueueName, USER);
- metrics.incrNodeTypeAggregations(user, NodeType.NODE_LOCAL);
+ metrics.incrNodeTypeAggregations(USER, NodeType.NODE_LOCAL);
checkAggregatedNodeTypes(queueSource, 1L, 0L, 0L);
checkAggregatedNodeTypes(parentQueueSource, 1L, 0L, 0L);
checkAggregatedNodeTypes(userSource, 1L, 0L, 0L);
checkAggregatedNodeTypes(parentUserSource, 1L, 0L, 0L);
- metrics.incrNodeTypeAggregations(user, NodeType.RACK_LOCAL);
+ metrics.incrNodeTypeAggregations(USER, NodeType.RACK_LOCAL);
checkAggregatedNodeTypes(queueSource, 1L, 1L, 0L);
checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 0L);
checkAggregatedNodeTypes(userSource, 1L, 1L, 0L);
checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 0L);
- metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH);
+ metrics.incrNodeTypeAggregations(USER, NodeType.OFF_SWITCH);
checkAggregatedNodeTypes(queueSource, 1L, 1L, 1L);
checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 1L);
checkAggregatedNodeTypes(userSource, 1L, 1L, 1L);
checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 1L);
- metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH);
+ metrics.incrNodeTypeAggregations(USER, NodeType.OFF_SWITCH);
checkAggregatedNodeTypes(queueSource, 1L, 1L, 2L);
checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 2L);
checkAggregatedNodeTypes(userSource, 1L, 1L, 2L);
@@ -396,67 +415,60 @@ public class TestQueueMetrics {
@Test
public void testTwoLevelWithUserMetrics() {
- String parentQueueName = "root";
- String leafQueueName = "root.leaf";
- String user = "alice";
+ AppSchedulingInfo app = mockApp(USER);
- QueueMetrics parentMetrics =
- QueueMetrics.forQueue(ms, parentQueueName, null, true, conf);
- Queue parentQueue = mock(Queue.class);
- when(parentQueue.getMetrics()).thenReturn(parentMetrics);
- QueueMetrics metrics =
- QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, conf);
- MetricsSource parentQueueSource = queueSource(ms, parentQueueName);
- MetricsSource queueSource = queueSource(ms, leafQueueName);
- AppSchedulingInfo app = mockApp(user);
-
- metrics.submitApp(user);
- MetricsSource userSource = userSource(ms, leafQueueName, user);
- MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
+ QueueInfo root = new QueueInfo(null, "root", ms, conf, USER);
+ QueueInfo leaf = new QueueInfo(root, "root.leaf", ms, conf, USER);
+ leaf.queueMetrics.submitApp(USER);
AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
- .checkAgainst(queueSource, true);
+ .checkAgainst(leaf.queueSource, true);
AppMetricsChecker appMetricsParentQueueSourceChecker =
AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
- .checkAgainst(parentQueueSource, true);
+ .checkAgainst(root.queueSource, true);
AppMetricsChecker appMetricsUserSourceChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
- .checkAgainst(userSource, true);
+ .checkAgainst(leaf.userSource, true);
AppMetricsChecker appMetricsParentUserSourceChecker =
AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
- .checkAgainst(parentUserSource, true);
+ .checkAgainst(root.userSource, true);
- metrics.submitAppAttempt(user);
+ leaf.queueMetrics.submitAppAttempt(USER);
appMetricsQueueSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 1)
- .checkAgainst(queueSource, true);
+ .checkAgainst(leaf.queueSource, true);
appMetricsParentQueueSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker)
.gaugeInt(APPS_PENDING, 1)
- .checkAgainst(parentQueueSource, true);
+ .checkAgainst(root.queueSource, true);
appMetricsUserSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_PENDING, 1)
- .checkAgainst(userSource, true);
+ .checkAgainst(leaf.userSource, true);
appMetricsParentUserSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker)
.gaugeInt(APPS_PENDING, 1)
- .checkAgainst(parentUserSource, true);
+ .checkAgainst(root.userSource, true);
- parentMetrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
+ root.queueMetrics.setAvailableResourcesToQueue(
+ RMNodeLabelsManager.NO_LABEL,
Resources.createResource(100*GB, 100));
- metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
+ leaf.queueMetrics.setAvailableResourcesToQueue(
+ RMNodeLabelsManager.NO_LABEL,
Resources.createResource(100*GB, 100));
- parentMetrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL,
- user, Resources.createResource(10*GB, 10));
- metrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL,
- user, Resources.createResource(10*GB, 10));
- metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
- user, 5, Resources.createResource(3*GB, 3));
+ root.queueMetrics.setAvailableResourcesToUser(
+ RMNodeLabelsManager.NO_LABEL,
+ USER, Resources.createResource(10*GB, 10));
+ leaf.queueMetrics.setAvailableResourcesToUser(
+ RMNodeLabelsManager.NO_LABEL,
+ USER, Resources.createResource(10*GB, 10));
+ leaf.queueMetrics.incrPendingResources(
+ RMNodeLabelsManager.NO_LABEL,
+ USER, 5, Resources.createResource(3*GB, 3));
ResourceMetricsChecker resMetricsQueueSourceChecker =
ResourceMetricsChecker.create()
@@ -465,7 +477,7 @@ public class TestQueueMetrics {
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
- .checkAgainst(queueSource);
+ .checkAgainst(leaf.queueSource);
ResourceMetricsChecker resMetricsParentQueueSourceChecker =
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 100 * GB)
@@ -473,7 +485,7 @@ public class TestQueueMetrics {
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
- .checkAgainst(parentQueueSource);
+ .checkAgainst(root.queueSource);
ResourceMetricsChecker resMetricsUserSourceChecker =
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 10 * GB)
@@ -481,7 +493,7 @@ public class TestQueueMetrics {
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
- .checkAgainst(userSource);
+ .checkAgainst(leaf.userSource);
ResourceMetricsChecker resMetricsParentUserSourceChecker =
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 10 * GB)
@@ -489,24 +501,24 @@ public class TestQueueMetrics {
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
- .checkAgainst(parentUserSource);
+ .checkAgainst(root.userSource);
- metrics.runAppAttempt(app.getApplicationId(), user);
+ leaf.queueMetrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsQueueSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
- .checkAgainst(queueSource, true);
+ .checkAgainst(leaf.queueSource, true);
appMetricsUserSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
- .checkAgainst(userSource, true);
+ .checkAgainst(leaf.userSource, true);
- metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
- user, 3, Resources.createResource(2*GB, 2), true);
- metrics.reserveResource(RMNodeLabelsManager.NO_LABEL,
- user, Resources.createResource(3*GB, 3));
+ leaf.queueMetrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
+ USER, 3, Resources.createResource(2*GB, 2), true);
+ leaf.queueMetrics.reserveResource(RMNodeLabelsManager.NO_LABEL,
+ USER, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
resMetricsQueueSourceChecker =
@@ -521,7 +533,7 @@ public class TestQueueMetrics {
.gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1)
- .checkAgainst(queueSource);
+ .checkAgainst(leaf.queueSource);
resMetricsParentQueueSourceChecker =
ResourceMetricsChecker
.createFromChecker(resMetricsParentQueueSourceChecker)
@@ -535,7 +547,7 @@ public class TestQueueMetrics {
.gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1)
- .checkAgainst(parentQueueSource);
+ .checkAgainst(root.queueSource);
resMetricsUserSourceChecker =
ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
@@ -548,7 +560,7 @@ public class TestQueueMetrics {
.gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1)
- .checkAgainst(userSource);
+ .checkAgainst(leaf.userSource);
resMetricsParentUserSourceChecker = ResourceMetricsChecker
.createFromChecker(resMetricsParentUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
@@ -561,12 +573,12 @@ public class TestQueueMetrics {
.gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1)
- .checkAgainst(parentUserSource);
+ .checkAgainst(root.userSource);
- metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
- user, 1, Resources.createResource(2*GB, 2));
- metrics.unreserveResource(RMNodeLabelsManager.NO_LABEL,
- user, Resources.createResource(3*GB, 3));
+ leaf.queueMetrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
+ USER, 1, Resources.createResource(2*GB, 2));
+ leaf.queueMetrics.unreserveResource(RMNodeLabelsManager.NO_LABEL,
+ USER, Resources.createResource(3*GB, 3));
ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
@@ -575,7 +587,7 @@ public class TestQueueMetrics {
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0)
- .checkAgainst(queueSource);
+ .checkAgainst(leaf.queueSource);
ResourceMetricsChecker.createFromChecker(resMetricsParentQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
@@ -584,7 +596,7 @@ public class TestQueueMetrics {
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0)
- .checkAgainst(parentQueueSource);
+ .checkAgainst(root.queueSource);
ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
@@ -593,7 +605,7 @@ public class TestQueueMetrics {
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0)
- .checkAgainst(userSource);
+ .checkAgainst(leaf.userSource);
ResourceMetricsChecker.createFromChecker(resMetricsParentUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
@@ -602,46 +614,46 @@ public class TestQueueMetrics {
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0)
- .checkAgainst(parentUserSource);
+ .checkAgainst(root.userSource);
- metrics.finishAppAttempt(
+ leaf.queueMetrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser());
appMetricsQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsQueueSourceChecker)
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_RUNNING, 0)
- .checkAgainst(queueSource, true);
+ .checkAgainst(leaf.queueSource, true);
appMetricsParentQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsParentQueueSourceChecker)
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 0)
- .checkAgainst(parentQueueSource, true);
+ .checkAgainst(root.queueSource, true);
appMetricsUserSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsUserSourceChecker)
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_RUNNING, 0)
- .checkAgainst(userSource, true);
+ .checkAgainst(leaf.userSource, true);
appMetricsParentUserSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsParentUserSourceChecker)
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 0)
- .checkAgainst(parentUserSource, true);
+ .checkAgainst(root.userSource, true);
- metrics.finishApp(user, RMAppState.FINISHED);
+ leaf.queueMetrics.finishApp(USER, RMAppState.FINISHED);
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.counter(APPS_COMPLETED, 1)
- .checkAgainst(queueSource, true);
+ .checkAgainst(leaf.queueSource, true);
AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker)
.counter(APPS_COMPLETED, 1)
- .checkAgainst(parentQueueSource, true);
+ .checkAgainst(root.queueSource, true);
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.counter(APPS_COMPLETED, 1)
- .checkAgainst(userSource, true);
+ .checkAgainst(leaf.userSource, true);
AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker)
.counter(APPS_COMPLETED, 1)
- .checkAgainst(parentUserSource, true);
+ .checkAgainst(root.userSource, true);
}
@Test
@@ -719,7 +731,7 @@ public class TestQueueMetrics {
assertCounter("AggregateOffSwitchContainersAllocated", offSwitch, rb);
}
- private static AppSchedulingInfo mockApp(String user) {
+ static AppSchedulingInfo mockApp(String user) {
AppSchedulingInfo app = mock(AppSchedulingInfo.class);
when(app.getUser()).thenReturn(user);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
@@ -732,7 +744,7 @@ public class TestQueueMetrics {
return ms.getSource(QueueMetrics.sourceName(queue).toString());
}
- private static MetricsSource userSource(MetricsSystem ms, String queue,
+ public static MetricsSource userSource(MetricsSystem ms, String queue,
String user) {
return ms.getSource(QueueMetrics.sourceName(queue).
append(",user=").append(user).toString());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java
new file mode 100644
index 0000000..76a9849
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java
@@ -0,0 +1,645 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .QueueMetricsForCustomResources.QueueMetricsCustomResource;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES;
+import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper
+ .extractCustomResourcesAsStrings;
+import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper.newResource;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_VCORE_SECONDS_PREEMPTED;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics.queueSource;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestQueueMetricsForCustomResources {
+ public enum MetricsForCustomResource {
+ ALLOCATED, AVAILABLE, PENDING, RESERVED, AGGREGATE_PREEMPTED_SECONDS
+ }
+
+ public static final long GB = 1024; // MB
+ private static final Configuration CONF = new Configuration();
+ private static final String CUSTOM_RES_1 = "custom_res_1";
+ private static final String CUSTOM_RES_2 = "custom_res_2";
+ public static final String USER = "alice";
+ private Resource defaultResource;
+ private MetricsSystem ms;
+
+ @Before
+ public void setUp() {
+ ms = new MetricsSystemImpl();
+ QueueMetrics.clearQueueMetrics();
+ initializeResourceTypes();
+ createDefaultResource();
+ }
+
+ private void createDefaultResource() {
+ defaultResource = newResource(4 * GB, 4,
+ ImmutableMap.<String, String> builder()
+ .put(CUSTOM_RES_1, String.valueOf(15 * GB))
+ .put(CUSTOM_RES_2, String.valueOf(20 * GB))
+ .build());
+ }
+
+ private void initializeResourceTypes() {
+ Map<String, ResourceInformation> riMap = new HashMap<>();
+
+ ResourceInformation memory = ResourceInformation.newInstance(
+ ResourceInformation.MEMORY_MB.getName(),
+ ResourceInformation.MEMORY_MB.getUnits(),
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+ ResourceInformation vcores = ResourceInformation.newInstance(
+ ResourceInformation.VCORES.getName(),
+ ResourceInformation.VCORES.getUnits(),
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+ ResourceInformation res1 = ResourceInformation.newInstance(CUSTOM_RES_1,
+ ResourceInformation.VCORES.getUnits(), 0, 2000);
+ ResourceInformation res2 = ResourceInformation.newInstance(CUSTOM_RES_2,
+ ResourceInformation.VCORES.getUnits(), 0, 2000);
+
+ riMap.put(ResourceInformation.MEMORY_URI, memory);
+ riMap.put(ResourceInformation.VCORES_URI, vcores);
+ riMap.put(CUSTOM_RES_1, res1);
+ riMap.put(CUSTOM_RES_2, res2);
+ ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+ }
+
+ private static void assertCustomResourceValue(QueueMetrics metrics,
+ MetricsForCustomResource metricsType,
+ Function<QueueMetrics, Resource> func,
+ String resourceName,
+ long expectedValue) {
+ Resource res = func.apply(metrics);
+ Long value = res.getResourceValue(resourceName);
+ assertCustomResourceValueInternal(metricsType, resourceName,
+ expectedValue, value);
+ }
+
+ private static void assertCustomResourceValueInternal(
+ MetricsForCustomResource metricsType, String resourceName, long
+ expectedValue, Long value) {
+ assertNotNull(
+ "QueueMetrics should have custom resource metrics value " +
+ "for resource: " + resourceName, value);
+ assertEquals(String.format(
+ "QueueMetrics should have custom resource metrics value %d " +
+ "for resource: %s for metrics type %s",
+ expectedValue, resourceName, metricsType), expectedValue,
+ (long) value);
+ }
+
+ private static Map<String, String> getCustomResourcesWithValue(long value) {
+ return ImmutableMap.<String, String>builder()
+ .put(CUSTOM_RES_1, String.valueOf(value))
+ .put(CUSTOM_RES_2, String.valueOf(value))
+ .build();
+ }
+
+ private QueueInfo createFourLevelQueueHierarchy() {
+ QueueInfo root = new QueueInfo(null, "root", ms, CONF, USER);
+ QueueInfo sub = new QueueInfo(root, "root.subQ", ms, CONF, USER);
+ QueueInfo sub2 = new QueueInfo(sub, "root.subQ2", ms, CONF, USER);
+ return new QueueInfo(sub2, "root.subQ2.leafQ", ms, CONF, USER);
+ }
+
+ private QueueInfo createBasicQueueHierarchy() {
+ QueueInfo root = new QueueInfo(null, "root", ms, CONF, USER);
+ return new QueueInfo(root, "root.leaf", ms, CONF, USER);
+ }
+
+ private QueueMetricsTestData.Builder
+ createQueueMetricsTestDataWithContainers(int containers) {
+ return createDefaultQueueMetricsTestData()
+ .withContainers(containers);
+ }
+
+ private QueueMetricsTestData.Builder createDefaultQueueMetricsTestData() {
+ return QueueMetricsTestData.Builder.create()
+ .withUser(USER)
+ .withPartition(RMNodeLabelsManager.NO_LABEL);
+ }
+
+ private void testIncreasePendingResources(QueueMetricsTestData testData) {
+ testIncreasePendingResourcesInternal(testData.containers, testData);
+ }
+
+ private void testIncreasePendingResourcesWithoutContainer(
+ QueueMetricsTestData testData) {
+ testIncreasePendingResourcesInternal(1, testData);
+ }
+
+ private void testIncreasePendingResourcesInternal(int containers,
+ QueueMetricsTestData testData) {
+ testData.leafQueue.queueMetrics.incrPendingResources(testData.partition,
+ testData.user, containers, testData.resource);
+
+ ResourceMetricsChecker checker = ResourceMetricsChecker
+ .create()
+ .gaugeInt(PENDING_CONTAINERS, containers)
+ .gaugeLong(PENDING_MB, containers *
+ testData.resource.getMemorySize())
+ .gaugeInt(PENDING_V_CORES, containers *
+ testData.resource.getVirtualCores());
+ assertAllMetrics(testData.leafQueue, checker,
+ QueueMetrics::getPendingResources,
+ MetricsForCustomResource.PENDING, computeExpectedCustomResourceValues(
+ testData.customResourceValues, (k, v) -> v * containers));
+ }
+
+ private void testAllocateResources(boolean decreasePending,
+ QueueMetricsTestData testData) {
+ testData.leafQueue.queueMetrics.allocateResources(testData.partition,
+ testData.user, testData.containers, testData.resource, decreasePending);
+
+ ResourceMetricsChecker checker = ResourceMetricsChecker
+ .create()
+ .gaugeInt(ALLOCATED_CONTAINERS, testData.containers)
+ .counter(AGGREGATE_CONTAINERS_ALLOCATED, testData.containers)
+ .gaugeLong(ALLOCATED_MB, testData.containers *
+ testData.resource.getMemorySize())
+ .gaugeInt(ALLOCATED_V_CORES, testData.containers *
+ testData.resource.getVirtualCores())
+ .gaugeInt(PENDING_CONTAINERS, 0)
+ .gaugeLong(PENDING_MB, 0)
+ .gaugeInt(PENDING_V_CORES, 0)
+ .checkAgainst(testData.leafQueue.queueSource);
+ if (decreasePending) {
+ assertAllMetrics(testData.leafQueue, checker,
+ QueueMetrics::getPendingResources,
+ MetricsForCustomResource.PENDING,
+ computeExpectedCustomResourceValues(testData.customResourceValues,
+ (k, v) -> 0L));
+ }
+ if (!testData.customResourceValues.isEmpty()) {
+ assertAllMetrics(testData.leafQueue, checker,
+ QueueMetrics::getAllocatedResources,
+ MetricsForCustomResource.ALLOCATED,
+ computeExpectedCustomResourceValues(testData.customResourceValues,
+ (k, v) -> v * testData.containers));
+ }
+ }
+
+ private void testUpdatePreemptedSeconds(QueueMetricsTestData testData,
+ int seconds) {
+ testData.leafQueue.queueMetrics.updatePreemptedMemoryMBSeconds(
+ testData.resource.getMemorySize() * seconds);
+ testData.leafQueue.queueMetrics.updatePreemptedVcoreSeconds(
+ testData.resource.getVirtualCores() * seconds);
+ testData.leafQueue.queueMetrics.updatePreemptedSecondsForCustomResources(
+ testData.resource, seconds);
+
+ ResourceMetricsChecker checker = ResourceMetricsChecker
+ .create()
+ .counter(AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED,
+ testData.resource.getMemorySize() * seconds)
+ .counter(AGGREGATE_VCORE_SECONDS_PREEMPTED,
+ testData.resource.getVirtualCores() * seconds);
+
+ assertQueueMetricsOnly(testData.leafQueue, checker,
+ this::convertPreemptedSecondsToResource,
+ MetricsForCustomResource.AGGREGATE_PREEMPTED_SECONDS,
+ computeExpectedCustomResourceValues(testData.customResourceValues,
+ (k, v) -> v * seconds));
+ }
+
+ private Resource convertPreemptedSecondsToResource(QueueMetrics qm) {
+ QueueMetricsCustomResource customValues = qm
+ .getAggregatedPreemptedSecondsResources();
+ MutableCounterLong vcoreSeconds = qm
+ .getAggregateVcoreSecondsPreempted();
+ MutableCounterLong memorySeconds = qm
+ .getAggregateMemoryMBSecondsPreempted();
+ return Resource.newInstance(
+ memorySeconds.value(), (int) vcoreSeconds.value(),
+ customValues.getValues());
+ }
+
+ private void testReserveResources(QueueMetricsTestData testData) {
+ testData.leafQueue.queueMetrics.reserveResource(testData.partition,
+ testData.user, testData.resource);
+
+ ResourceMetricsChecker checker = ResourceMetricsChecker
+ .create()
+ .gaugeInt(RESERVED_CONTAINERS, 1)
+ .gaugeLong(RESERVED_MB, testData.resource.getMemorySize())
+ .gaugeInt(RESERVED_V_CORES, testData.resource.getVirtualCores())
+ .checkAgainst(testData.leafQueue.queueSource);
+ assertAllMetrics(testData.leafQueue, checker,
+ QueueMetrics::getReservedResources,
+ MetricsForCustomResource.RESERVED,
+ computeExpectedCustomResourceValues(
+ testData.customResourceValues, (k, v) -> v));
+ }
+
+ private void testGetAllocatedResources(QueueMetricsTestData testData) {
+ testAllocateResources(false, testData);
+
+ Resource res = testData.leafQueue.queueMetrics.getAllocatedResources();
+ if (testData.customResourceValues.size() > 0) {
+ assertCustomResourceValueInternal(MetricsForCustomResource.ALLOCATED,
+ CUSTOM_RES_1,
+ testData.customResourceValues.get(CUSTOM_RES_1) * testData.containers,
+ res.getResourceValue(CUSTOM_RES_1));
+ assertCustomResourceValueInternal(MetricsForCustomResource.ALLOCATED,
+ CUSTOM_RES_2,
+ testData.customResourceValues.get(CUSTOM_RES_2) * testData.containers,
+ res.getResourceValue(CUSTOM_RES_2));
+ }
+ }
+
+ private void assertAllMetrics(QueueInfo queueInfo,
+ ResourceMetricsChecker checker,
+ Function<QueueMetrics, Resource> func,
+ MetricsForCustomResource metricsType,
+ Map<String, Long> expectedCustomResourceValues) {
+ assertAllQueueMetrics(queueInfo, checker, func, metricsType,
+ expectedCustomResourceValues);
+
+ //assert leaf and root userSources
+ checker = ResourceMetricsChecker.createFromChecker(checker)
+ .checkAgainst(queueInfo.userSource);
+ ResourceMetricsChecker.createFromChecker(checker)
+ .checkAgainst(queueInfo.getRoot().userSource);
+ }
+
+ private void assertQueueMetricsOnly(QueueInfo queueInfo,
+ ResourceMetricsChecker checker,
+ Function<QueueMetrics, Resource> func,
+ MetricsForCustomResource metricsType,
+ Map<String, Long> expectedCustomResourceValues) {
+ assertAllQueueMetrics(queueInfo, checker, func, metricsType,
+ expectedCustomResourceValues);
+ }
+
+ private void assertAllQueueMetrics(QueueInfo queueInfo,
+ ResourceMetricsChecker checker,
+ Function<QueueMetrics, Resource> func,
+ MetricsForCustomResource metricsType,
+ Map<String, Long> expectedCustomResourceValues) {
+ // assert normal resource metrics values
+ queueInfo.checkAllQueueSources(qs -> ResourceMetricsChecker
+ .createFromChecker(checker).checkAgainst(qs));
+
+ // assert custom resource metrics values
+ queueInfo.checkAllQueueMetrics(qm -> {
+ assertCustomResourceValue(qm, metricsType, func, CUSTOM_RES_1,
+ expectedCustomResourceValues.get(CUSTOM_RES_1));
+ assertCustomResourceValue(qm, metricsType, func, CUSTOM_RES_2,
+ expectedCustomResourceValues.get(CUSTOM_RES_2));
+ });
+ }
+
+ private Map<String, Long> computeExpectedCustomResourceValues(
+ Map<String, Long> customResourceValues,
+ BiFunction<String, Long, Long> func) {
+ Map<String, Long> values = Maps.newHashMap();
+ for (Map.Entry<String, Long> res : customResourceValues.entrySet()) {
+ values.put(res.getKey(), func.apply(res.getKey(), res.getValue()));
+ }
+ return values;
+ }
+
+ @Test
+ public void testSetAvailableResourcesToQueue1() {
+ String queueName = "single";
+ QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null,
+ false, CONF);
+ MetricsSource queueSource = queueSource(ms, queueName);
+
+ metrics.setAvailableResourcesToQueue(newResource(
+ GB, 4,
+ ImmutableMap.<String, String> builder()
+ .put(CUSTOM_RES_1, String.valueOf(5 * GB))
+ .put(CUSTOM_RES_2, String.valueOf(6 * GB))
+ .build()));
+ ResourceMetricsChecker.create()
+ .gaugeLong(AVAILABLE_MB, GB)
+ .gaugeInt(AVAILABLE_V_CORES, 4)
+ .checkAgainst(queueSource);
+
+ assertCustomResourceValue(metrics,
+ MetricsForCustomResource.AVAILABLE,
+ QueueMetrics::getAvailableResources, CUSTOM_RES_1, 5 * GB);
+ assertCustomResourceValue(metrics,
+ MetricsForCustomResource.AVAILABLE,
+ QueueMetrics::getAvailableResources, CUSTOM_RES_2, 6 * GB);
+ }
+
+ @Test
+ public void testSetAvailableResourcesToQueue2() {
+ String queueName = "single";
+ QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null,
+ false, CONF);
+ MetricsSource queueSource = queueSource(ms, queueName);
+
+ metrics.setAvailableResourcesToQueue(null,
+ newResource(GB, 4,
+ ImmutableMap.<String, String> builder()
+ .put(CUSTOM_RES_1, String.valueOf(15 * GB))
+ .put(CUSTOM_RES_2, String.valueOf(20 * GB))
+ .build()));
+ ResourceMetricsChecker.create()
+ .gaugeLong(AVAILABLE_MB, GB)
+ .gaugeInt(AVAILABLE_V_CORES, 4)
+ .checkAgainst(queueSource);
+
+ assertCustomResourceValue(metrics,
+ MetricsForCustomResource.AVAILABLE,
+ QueueMetrics::getAvailableResources, CUSTOM_RES_1, 15 * GB);
+ assertCustomResourceValue(metrics,
+ MetricsForCustomResource.AVAILABLE,
+ QueueMetrics::getAvailableResources, CUSTOM_RES_2, 20 * GB);
+ }
+
+ @Test
+ public void testIncreasePendingResources() {
+ QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
+ .withLeafQueue(createBasicQueueHierarchy())
+ .withResourceToDecrease(
+ newResource(GB, 2, getCustomResourcesWithValue(2 * GB)), 2)
+ .withResources(defaultResource)
+ .build();
+
+ testIncreasePendingResources(testData);
+ }
+
+ @Test
+ public void testDecreasePendingResources() {
+ Resource resourceToDecrease =
+ newResource(GB, 2, getCustomResourcesWithValue(2 * GB));
+ int containersToDecrease = 2;
+ int containers = 5;
+ QueueMetricsTestData testData =
+ createQueueMetricsTestDataWithContainers(containers)
+ .withLeafQueue(createBasicQueueHierarchy())
+ .withResourceToDecrease(resourceToDecrease, containers)
+ .withResources(defaultResource)
+ .build();
+
+ //compute expected values
+ final int vCoresToDecrease = resourceToDecrease.getVirtualCores();
+ final long memoryMBToDecrease = resourceToDecrease.getMemorySize();
+ final int containersAfterDecrease = containers - containersToDecrease;
+ final int vcoresAfterDecrease =
+ (defaultResource.getVirtualCores() * containers)
+ - (vCoresToDecrease * containersToDecrease);
+ final long memoryAfterDecrease =
+ (defaultResource.getMemorySize() * containers)
+ - (memoryMBToDecrease * containersToDecrease);
+
+ //first, increase resources to be able to decrease some
+ testIncreasePendingResources(testData);
+
+ //decrease resources
+ testData.leafQueue.queueMetrics.decrPendingResources(testData.partition,
+ testData.user, containersToDecrease,
+ ResourceTypesTestHelper.newResource(memoryMBToDecrease,
+ vCoresToDecrease,
+ extractCustomResourcesAsStrings(resourceToDecrease)));
+
+ //check
+ ResourceMetricsChecker checker = ResourceMetricsChecker
+ .create()
+ .gaugeInt(PENDING_CONTAINERS, containersAfterDecrease)
+ .gaugeLong(PENDING_MB, memoryAfterDecrease)
+ .gaugeInt(PENDING_V_CORES, vcoresAfterDecrease)
+ .checkAgainst(testData.leafQueue.queueSource);
+
+ assertAllMetrics(testData.leafQueue, checker,
+ QueueMetrics::getPendingResources,
+ MetricsForCustomResource.PENDING,
+ computeExpectedCustomResourceValues(testData.customResourceValues,
+ (k, v) -> v * containers - (resourceToDecrease.getResourceValue(k)
+ * containersToDecrease)));
+ }
+
+ @Test
+ public void testAllocateResourcesWithoutDecreasePending() {
+ QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
+ .withLeafQueue(createBasicQueueHierarchy())
+ .withResources(defaultResource)
+ .build();
+
+ testAllocateResources(false, testData);
+ }
+
+ @Test
+ public void testAllocateResourcesWithDecreasePending() {
+ QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
+ .withLeafQueue(createBasicQueueHierarchy())
+ .withResourceToDecrease(
+ newResource(GB, 2, getCustomResourcesWithValue(2 * GB)), 2)
+ .withResources(defaultResource)
+ .build();
+
+ //first, increase pending resources to be able to decrease some
+ testIncreasePendingResources(testData);
+
+ //then allocate with decrease pending resources
+ testAllocateResources(true, testData);
+ }
+
+ @Test
+ public void testAllocateResourcesWithoutContainer() {
+ QueueMetricsTestData testData = createDefaultQueueMetricsTestData()
+ .withLeafQueue(createBasicQueueHierarchy())
+ .withResources(defaultResource)
+ .build();
+
+ //first, increase pending resources
+ testIncreasePendingResourcesWithoutContainer(testData);
+
+ Resource resource = testData.resource;
+ testData.leafQueue.queueMetrics.allocateResources(testData.partition,
+ testData.user, resource);
+
+ ResourceMetricsChecker checker = ResourceMetricsChecker.create()
+ .gaugeLong(ALLOCATED_MB, resource.getMemorySize())
+ .gaugeInt(ALLOCATED_V_CORES, resource.getVirtualCores())
+ .gaugeInt(PENDING_CONTAINERS, 1).gaugeLong(PENDING_MB, 0)
+ .gaugeInt(PENDING_V_CORES, 0);
+
+ checker.checkAgainst(testData.leafQueue.queueSource);
+ checker.checkAgainst(testData.leafQueue.getRoot().queueSource);
+
+ assertAllMetrics(testData.leafQueue, checker,
+ QueueMetrics::getPendingResources,
+ MetricsForCustomResource.PENDING, computeExpectedCustomResourceValues(
+ testData.customResourceValues, (k, v) -> 0L));
+ assertAllMetrics(testData.leafQueue, checker,
+ QueueMetrics::getAllocatedResources,
+ MetricsForCustomResource.ALLOCATED, computeExpectedCustomResourceValues(
+ testData.customResourceValues, (k, v) -> v));
+ }
+
+ @Test
+ public void testReleaseResources() {
+ int containers = 5;
+ QueueMetricsTestData testData =
+ createQueueMetricsTestDataWithContainers(containers)
+ .withLeafQueue(createBasicQueueHierarchy())
+ .withResourceToDecrease(defaultResource, containers)
+ .withResources(defaultResource)
+ .build();
+
+ //first, allocate some resources so that we can release some
+ testAllocateResources(false, testData);
+
+ testData.leafQueue.queueMetrics.releaseResources(testData.partition,
+ testData.user, containers, defaultResource);
+
+ ResourceMetricsChecker checker = ResourceMetricsChecker
+ .create()
+ .counter(AGGREGATE_CONTAINERS_ALLOCATED, containers)
+ .counter(AGGREGATE_CONTAINERS_RELEASED, containers)
+ .checkAgainst(testData.leafQueue.queueSource);
+ assertAllMetrics(testData.leafQueue, checker,
+ QueueMetrics::getAllocatedResources,
+ MetricsForCustomResource.ALLOCATED, computeExpectedCustomResourceValues(
+ testData.customResourceValues, (k, v) -> 0L));
+ }
+
+ @Test
+ public void testUpdatePreemptedSecondsForCustomResources() {
+ QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
+ .withLeafQueue(createFourLevelQueueHierarchy())
+ .withResources(defaultResource)
+ .build();
+
+ final int seconds = 1;
+ testUpdatePreemptedSeconds(testData, seconds);
+ }
+
+ @Test
+ public void testUpdatePreemptedSecondsForCustomResourcesMoreSeconds() {
+ QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
+ .withLeafQueue(createFourLevelQueueHierarchy())
+ .withResources(defaultResource)
+ .build();
+
+ final int seconds = 15;
+ testUpdatePreemptedSeconds(testData, seconds);
+ }
+
+ @Test
+ public void testReserveResources() {
+ QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
+ .withLeafQueue(createBasicQueueHierarchy())
+ .withResources(defaultResource)
+ .build();
+
+ testReserveResources(testData);
+ }
+
+ @Test
+ public void testUnreserveResources() {
+ QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
+ .withLeafQueue(createBasicQueueHierarchy())
+ .withResources(defaultResource)
+ .build();
+
+ testReserveResources(testData);
+
+ testData.leafQueue.queueMetrics.unreserveResource(testData.partition,
+ testData.user, defaultResource);
+
+ ResourceMetricsChecker checker = ResourceMetricsChecker
+ .create()
+ .gaugeInt(RESERVED_CONTAINERS, 0)
+ .gaugeLong(RESERVED_MB, 0)
+ .gaugeInt(RESERVED_V_CORES, 0)
+ .checkAgainst(testData.leafQueue.queueSource);
+ assertAllMetrics(testData.leafQueue, checker,
+ QueueMetrics::getReservedResources,
+ MetricsForCustomResource.RESERVED, computeExpectedCustomResourceValues(
+ testData.customResourceValues, (k, v) -> 0L));
+ }
+
+ @Test
+ public void testGetAllocatedResourcesWithCustomResources() {
+ QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
+ .withLeafQueue(createBasicQueueHierarchy())
+ .withResources(defaultResource)
+ .build();
+
+ testGetAllocatedResources(testData);
+ }
+
+ @Test
+ public void testGetAllocatedResourcesWithoutCustomResources() {
+ QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
+ .withResources(newResource(4 * GB, 4, Collections.emptyMap()))
+ .withLeafQueue(createBasicQueueHierarchy())
+ .build();
+
+ testGetAllocatedResources(testData);
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org