You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2020/06/01 17:59:08 UTC
[hadoop] branch branch-2.10 updated: YARN-6492. Generate queue
metrics for each partition. Contributed by Manikandan R
This is an automated email from the ASF dual-hosted git repository.
jhung pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new b9a0f99 YARN-6492. Generate queue metrics for each partition. Contributed by Manikandan R
b9a0f99 is described below
commit b9a0f999662161958bf67551a450a743d14fe648
Author: Jonathan Hung <jh...@linkedin.com>
AuthorDate: Mon Jun 1 10:48:41 2020 -0700
YARN-6492. Generate queue metrics for each partition. Contributed by Manikandan R
---
.../hadoop/yarn/util/resource/Resources.java | 13 +
.../scheduler/AppSchedulingInfo.java | 28 +-
.../scheduler/ContainerUpdateContext.java | 12 +-
.../scheduler/PartitionQueueMetrics.java | 89 +++
.../resourcemanager/scheduler/QueueMetrics.java | 552 ++++++++++-----
.../scheduler/SchedulerApplicationAttempt.java | 6 +-
.../scheduler/capacity/CSQueueMetrics.java | 7 +-
.../scheduler/capacity/LeafQueue.java | 34 +-
.../scheduler/common/fica/FiCaSchedulerApp.java | 3 +-
.../scheduler/fair/FSAppAttempt.java | 2 +-
.../scheduler/fifo/FifoAppAttempt.java | 2 +-
.../scheduler/TestAppSchedulingInfo.java | 7 +-
.../scheduler/TestPartitionQueueMetrics.java | 752 +++++++++++++++++++++
.../scheduler/TestSchedulerApplicationAttempt.java | 4 +-
.../scheduler/capacity/TestLeafQueue.java | 4 +-
.../capacity/TestNodeLabelContainerAllocation.java | 405 ++++++++++-
16 files changed, 1702 insertions(+), 218 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index cc16b9f..010e6a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -304,6 +304,19 @@ public class Resources {
}
return lhs;
}
+
+ /**
+ * Subtract {@code rhs} from {@code lhs} and reset any negative values to
+ * zero. This call will operate on a copy of {@code lhs}, leaving {@code lhs}
+ * unmodified.
+ *
+ * @param lhs {@link Resource} to subtract from
+ * @param rhs {@link Resource} to subtract
+ * @return the value of lhs after subtraction
+ */
+ public static Resource subtractNonNegative(Resource lhs, Resource rhs) {
+ return subtractFromNonNegative(clone(lhs), rhs);
+ }
public static Resource negate(Resource resource) {
return subtract(NONE, resource);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 1a6d48f..aebfe30 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -89,10 +90,12 @@ public class AppSchedulingInfo {
private final ReentrantReadWriteLock.WriteLock writeLock;
public final ContainerUpdateContext updateContext;
+
+ private final RMContext rmContext;
public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
String user, Queue queue, AbstractUsersManager abstractUsersManager,
- long epoch, ResourceUsage appResourceUsage) {
+ long epoch, ResourceUsage appResourceUsage, RMContext rmContext) {
this.applicationAttemptId = appAttemptId;
this.applicationId = appAttemptId.getApplicationId();
this.queue = queue;
@@ -106,6 +109,7 @@ public class AppSchedulingInfo {
updateContext = new ContainerUpdateContext(this);
readLock = lock.readLock();
writeLock = lock.writeLock();
+ this.rmContext = rmContext;
}
public ApplicationId getApplicationId() {
@@ -437,7 +441,7 @@ public class AppSchedulingInfo {
public List<ResourceRequest> allocate(NodeType type,
SchedulerNode node, SchedulerRequestKey schedulerKey,
- Container containerAllocated) {
+ RMContainer containerAllocated) {
try {
writeLock.lock();
@@ -593,7 +597,7 @@ public class AppSchedulingInfo {
}
private void updateMetricsForAllocatedContainer(NodeType type,
- SchedulerNode node, Container containerAllocated) {
+ SchedulerNode node, RMContainer containerAllocated) {
QueueMetrics metrics = queue.getMetrics();
if (pending) {
// once an allocation is done we assume the application is
@@ -604,14 +608,16 @@ public class AppSchedulingInfo {
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationId=" + applicationId + " container="
- + containerAllocated.getId() + " host=" + containerAllocated
- .getNodeId().toString() + " user=" + user + " resource="
- + containerAllocated.getResource() + " type="
- + type);
+ + containerAllocated.getContainer().getId() + " host="
+ + containerAllocated.getContainer().getNodeId().toString() + " user="
+ + user + " resource="
+ + containerAllocated.getContainer().getResource() + " type=" + type);
}
- if(node != null) {
+ if (node != null) {
metrics.allocateResources(node.getPartition(), user, 1,
- containerAllocated.getResource(), true);
+ containerAllocated.getContainer().getResource(), false);
+ metrics.decrPendingResources(containerAllocated.getNodeLabelExpression(),
+ user, 1, containerAllocated.getContainer().getResource());
}
metrics.incrNodeTypeAggregations(user, type);
}
@@ -657,4 +663,8 @@ public class AppSchedulingInfo {
this.readLock.unlock();
}
}
+
+ public RMContext getRMContext() {
+ return this.rmContext;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
index 5ac2ac5..93f5891 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
@@ -162,10 +162,16 @@ public class ContainerUpdateContext {
// Decrement the pending using a dummy RR with
// resource = prev update req capability
if (prevReq != null) {
+ Container container = Container.newInstance(UNDEFINED,
+ schedulerNode.getNodeID(), "host:port", prevReq.getCapability(),
+ schedulerKey.getPriority(), null);
appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode,
- schedulerKey, Container.newInstance(UNDEFINED,
- schedulerNode.getNodeID(), "host:port",
- prevReq.getCapability(), schedulerKey.getPriority(), null));
+ schedulerKey,
+ new RMContainerImpl(container, schedulerKey,
+ appSchedulingInfo.getApplicationAttemptId(),
+ schedulerNode.getNodeID(), appSchedulingInfo.getUser(),
+ appSchedulingInfo.getRMContext(),
+ schedulingPlacementSet.getPrimaryRequestedNodePartition()));
}
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java
new file mode 100644
index 0000000..75e8380
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+
+@Metrics(context = "yarn")
+public class PartitionQueueMetrics extends QueueMetrics {
+
+ private String partition;
+
+ protected PartitionQueueMetrics(MetricsSystem ms, String queueName,
+ Queue parent, boolean enableUserMetrics, Configuration conf,
+ String partition) {
+ super(ms, queueName, parent, enableUserMetrics, conf);
+ this.partition = partition;
+ if (getParentQueue() != null) {
+ String newQueueName = (getParentQueue() instanceof CSQueue)
+ ? ((CSQueue) getParentQueue()).getQueuePath()
+ : getParentQueue().getQueueName();
+ String parentMetricName =
+ partition + METRIC_NAME_DELIMITER + newQueueName;
+ setParent(getQueueMetrics().get(parentMetricName));
+ }
+ }
+
+ /**
+ * Partition * Queue * User Metrics
+ *
+ * Computes Metrics at Partition (Node Label) * Queue * User Level.
+ *
+ * Sample JMX O/P Structure:
+ *
+ * PartitionQueueMetrics (labelX)
+ * QueueMetrics (A)
+ * usermetrics
+ * QueueMetrics (A1)
+ * usermetrics
+ * QueueMetrics (A2)
+ * usermetrics
+ * QueueMetrics (B)
+ * usermetrics
+ *
+ * @return QueueMetrics
+ */
+ @Override
+ public synchronized QueueMetrics getUserMetrics(String userName) {
+ if (users == null) {
+ return null;
+ }
+
+ String partitionJMXStr =
+ (partition.equals(DEFAULT_PARTITION)) ? DEFAULT_PARTITION_JMX_STR
+ : partition;
+
+ QueueMetrics metrics = (PartitionQueueMetrics) users.get(userName);
+ if (metrics == null) {
+ metrics = new PartitionQueueMetrics(this.metricsSystem, this.queueName,
+ null, false, this.conf, this.partition);
+ users.put(userName, metrics);
+ metricsSystem.register(
+ pSourceName(partitionJMXStr).append(qSourceName(queueName))
+ .append(",user=").append(userName).toString(),
+ "Metrics for user '" + userName + "' in queue '" + queueName + "'",
+ ((PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partitionJMXStr)
+ .tag(QUEUE_INFO, queueName)).tag(USER_INFO, userName));
+ }
+ return metrics;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
index b05a0ae..f08fdfc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
@@ -54,6 +53,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
@InterfaceAudience.Private
@@ -114,15 +114,19 @@ public class QueueMetrics implements MetricsSource {
info("Queue", "Metrics by queue");
protected static final MetricsInfo USER_INFO =
info("User", "Metrics by user");
+ protected static final MetricsInfo PARTITION_INFO =
+ info("Partition", "Metrics by partition");
static final Splitter Q_SPLITTER =
Splitter.on('.').omitEmptyStrings().trimResults();
protected final MetricsRegistry registry;
protected final String queueName;
- protected final QueueMetrics parent;
+ private QueueMetrics parent;
+ private final Queue parentQueue;
protected final MetricsSystem metricsSystem;
protected final Map<String, QueueMetrics> users;
protected final Configuration conf;
+ private final boolean enableUserMetrics;
private QueueMetricsForCustomResources queueMetricsForCustomResources;
private static final String ALLOCATED_RESOURCE_METRIC_PREFIX =
@@ -150,6 +154,18 @@ public class QueueMetrics implements MetricsSource {
private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC =
"Aggregate Preempted Seconds for NAME";
+ protected static final MetricsInfo P_RECORD_INFO =
+ info("PartitionQueueMetrics", "Metrics for the resource scheduler");
+
+ // Use "default" to operate NO_LABEL (default) partition internally
+ public static final String DEFAULT_PARTITION = "default";
+
+ // Use "" to register NO_LABEL (default) partition into metrics system
+ public static final String DEFAULT_PARTITION_JMX_STR = "";
+
+ // Metric Name Delimiter
+ public static final String METRIC_NAME_DELIMITER = ".";
+
protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) {
registry = new MetricsRegistry(RECORD_INFO);
@@ -157,6 +173,8 @@ public class QueueMetrics implements MetricsSource {
this.parent = parent != null ? parent.getMetrics() : null;
this.users = enableUserMetrics ? new HashMap<String, QueueMetrics>()
: null;
+ this.parentQueue = parent;
+ this.enableUserMetrics = enableUserMetrics;
metricsSystem = ms;
this.conf = conf;
runningTime = buildBuckets(conf);
@@ -178,12 +196,25 @@ public class QueueMetrics implements MetricsSource {
return sb;
}
- public synchronized
- static QueueMetrics forQueue(String queueName, Queue parent,
- boolean enableUserMetrics,
- Configuration conf) {
+ static StringBuilder pSourceName(String partition) {
+ StringBuilder sb = new StringBuilder(P_RECORD_INFO.name());
+ sb.append(",partition").append('=').append(partition);
+ return sb;
+ }
+
+ static StringBuilder qSourceName(String queueName) {
+ StringBuilder sb = new StringBuilder();
+ int i = 0;
+ for (String node : Q_SPLITTER.split(queueName)) {
+ sb.append(",q").append(i++).append('=').append(node);
+ }
+ return sb;
+ }
+
+ public synchronized static QueueMetrics forQueue(String queueName,
+ Queue parent, boolean enableUserMetrics, Configuration conf) {
return forQueue(DefaultMetricsSystem.instance(), queueName, parent,
- enableUserMetrics, conf);
+ enableUserMetrics, conf);
}
/**
@@ -209,24 +240,20 @@ public class QueueMetrics implements MetricsSource {
return QUEUE_METRICS;
}
- public synchronized
- static QueueMetrics forQueue(MetricsSystem ms, String queueName,
- Queue parent, boolean enableUserMetrics,
- Configuration conf) {
- QueueMetrics metrics = QUEUE_METRICS.get(queueName);
+ public synchronized static QueueMetrics forQueue(MetricsSystem ms,
+ String queueName, Queue parent, boolean enableUserMetrics,
+ Configuration conf) {
+ QueueMetrics metrics = getQueueMetrics().get(queueName);
if (metrics == null) {
- metrics =
- new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf).
- tag(QUEUE_INFO, queueName);
-
+ metrics = new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf)
+ .tag(QUEUE_INFO, queueName);
+
// Register with the MetricsSystems
if (ms != null) {
- metrics =
- ms.register(
- sourceName(queueName).toString(),
- "Metrics for queue: " + queueName, metrics);
+ metrics = ms.register(sourceName(queueName).toString(),
+ "Metrics for queue: " + queueName, metrics);
}
- QUEUE_METRICS.put(queueName, metrics);
+ getQueueMetrics().put(queueName, metrics);
}
return metrics;
@@ -238,7 +265,8 @@ public class QueueMetrics implements MetricsSource {
}
QueueMetrics metrics = users.get(userName);
if (metrics == null) {
- metrics = new QueueMetrics(metricsSystem, queueName, null, false, conf);
+ metrics =
+ new QueueMetrics(metricsSystem, queueName, null, false, conf);
users.put(userName, metrics);
metricsSystem.register(
sourceName(queueName).append(",user=").append(userName).toString(),
@@ -248,6 +276,96 @@ public class QueueMetrics implements MetricsSource {
return metrics;
}
+ /**
+ * Partition * Queue Metrics
+ *
+ * Computes Metrics at Partition (Node Label) * Queue Level.
+ *
+ * Sample JMX O/P Structure:
+ *
+ * PartitionQueueMetrics (labelX)
+ * QueueMetrics (A)
+ * metrics
+ * QueueMetrics (A1)
+ * metrics
+ * QueueMetrics (A2)
+ * metrics
+ * QueueMetrics (B)
+ * metrics
+ *
+ * @param partition
+ * @return QueueMetrics
+ */
+ public synchronized QueueMetrics getPartitionQueueMetrics(String partition) {
+
+ String partitionJMXStr = partition;
+
+ if ((partition == null)
+ || (partition.equals(RMNodeLabelsManager.NO_LABEL))) {
+ partition = DEFAULT_PARTITION;
+ partitionJMXStr = DEFAULT_PARTITION_JMX_STR;
+ }
+
+ String metricName = partition + METRIC_NAME_DELIMITER + this.queueName;
+ QueueMetrics metrics = getQueueMetrics().get(metricName);
+
+ if (metrics == null) {
+ QueueMetrics queueMetrics =
+ new PartitionQueueMetrics(metricsSystem, this.queueName, parentQueue,
+ this.enableUserMetrics, this.conf, partition);
+ metricsSystem.register(
+ pSourceName(partitionJMXStr).append(qSourceName(this.queueName))
+ .toString(),
+ "Metrics for queue: " + this.queueName,
+ queueMetrics.tag(PARTITION_INFO, partitionJMXStr).tag(QUEUE_INFO,
+ this.queueName));
+ getQueueMetrics().put(metricName, queueMetrics);
+ return queueMetrics;
+ } else {
+ return metrics;
+ }
+ }
+
+ /**
+ * Partition Metrics
+ *
+ * Computes Metrics at Partition (Node Label) Level.
+ *
+ * Sample JMX O/P Structure:
+ *
+ * PartitionQueueMetrics (labelX)
+ * metrics
+ *
+ * @param partition
+ * @return QueueMetrics
+ */
+ private QueueMetrics getPartitionMetrics(String partition) {
+
+ String partitionJMXStr = partition;
+ if ((partition == null)
+ || (partition.equals(RMNodeLabelsManager.NO_LABEL))) {
+ partition = DEFAULT_PARTITION;
+ partitionJMXStr = DEFAULT_PARTITION_JMX_STR;
+ }
+
+ String metricName = partition + METRIC_NAME_DELIMITER;
+ QueueMetrics metrics = getQueueMetrics().get(metricName);
+ if (metrics == null) {
+ metrics = new PartitionQueueMetrics(metricsSystem, this.queueName, null,
+ false, this.conf, partition);
+
+ // Register with the MetricsSystems
+ if (metricsSystem != null) {
+ metricsSystem.register(pSourceName(partitionJMXStr).toString(),
+ "Metrics for partition: " + partitionJMXStr,
+ (PartitionQueueMetrics) metrics.tag(PARTITION_INFO,
+ partitionJMXStr));
+ }
+ getQueueMetrics().put(metricName, metrics);
+ }
+ return metrics;
+ }
+
private ArrayList<Integer> parseInts(String value) {
ArrayList<Integer> result = new ArrayList<Integer>();
for(String s: value.split(",")) {
@@ -388,18 +506,37 @@ public class QueueMetrics implements MetricsSource {
*/
public void setAvailableResourcesToQueue(String partition, Resource limit) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
- availableMB.set(limit.getMemorySize());
- availableVCores.set(limit.getVirtualCores());
- if (queueMetricsForCustomResources != null) {
- queueMetricsForCustomResources.setAvailable(limit);
- registerCustomResources(
- queueMetricsForCustomResources.getAvailableValues(),
- AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC);
+ setAvailableResources(limit);
+ }
+ QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+ if (partitionQueueMetrics != null) {
+ partitionQueueMetrics.setAvailableResources(limit);
+ if (this.queueName.equals("root")) {
+ QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+ if (partitionMetrics != null) {
+ partitionMetrics.setAvailableResources(limit);
+ }
}
}
}
/**
+ * Set Available resources with support for resource vectors.
+ *
+ * @param limit
+ */
+ public void setAvailableResources(Resource limit) {
+ availableMB.set(limit.getMemorySize());
+ availableVCores.set(limit.getVirtualCores());
+ if (queueMetricsForCustomResources != null) {
+ queueMetricsForCustomResources.setAvailable(limit);
+ registerCustomResources(
+ queueMetricsForCustomResources.getAvailableValues(),
+ AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC);
+ }
+ }
+
+ /**
* Set available resources. To be called by scheduler periodically as
* resources become available.
* @param limit resource limit
@@ -420,7 +557,15 @@ public class QueueMetrics implements MetricsSource {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
- userMetrics.setAvailableResourcesToQueue(partition, limit);
+ userMetrics.setAvailableResources(limit);
+ }
+ }
+ QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+ if (partitionQueueMetrics != null) {
+ QueueMetrics partitionUserMetrics =
+ partitionQueueMetrics.getUserMetrics(user);
+ if (partitionUserMetrics != null) {
+ partitionUserMetrics.setAvailableResources(limit);
}
}
}
@@ -435,18 +580,46 @@ public class QueueMetrics implements MetricsSource {
*/
public void incrPendingResources(String partition, String user,
int containers, Resource res) {
+
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
- _incrPendingResources(containers, res);
- QueueMetrics userMetrics = getUserMetrics(user);
- if (userMetrics != null) {
- userMetrics.incrPendingResources(partition, user, containers, res);
- }
- if (parent != null) {
- parent.incrPendingResources(partition, user, containers, res);
+ internalIncrPendingResources(partition, user, containers, res);
+ }
+
+ QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+ if (partitionQueueMetrics != null) {
+ partitionQueueMetrics.internalIncrPendingResources(partition, user,
+ containers, res);
+ QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+ if (partitionMetrics != null) {
+ partitionMetrics.incrementPendingResources(containers, res);
}
}
}
+ public void internalIncrPendingResources(String partition, String user,
+ int containers, Resource res) {
+ incrementPendingResources(containers, res);
+ QueueMetrics userMetrics = getUserMetrics(user);
+ if (userMetrics != null) {
+ userMetrics.internalIncrPendingResources(partition, user, containers,
+ res);
+ }
+ if (parent != null) {
+ parent.internalIncrPendingResources(partition, user, containers, res);
+ }
+ }
+
+ private void incrementPendingResources(int containers, Resource res) {
+ pendingContainers.incr(containers);
+ pendingMB.incr(res.getMemorySize() * containers);
+ pendingVCores.incr(res.getVirtualCores() * containers);
+ if (queueMetricsForCustomResources != null) {
+ queueMetricsForCustomResources.increasePending(res, containers);
+ registerCustomResources(queueMetricsForCustomResources.getPendingValues(),
+ PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
+ }
+ }
+
protected Map<String, Long> initAndGetCustomResources() {
Map<String, Long> customResources = new HashMap<String, Long>();
ResourceInformation[] resources = ResourceUtils.getResourceTypesArray();
@@ -505,32 +678,38 @@ public class QueueMetrics implements MetricsSource {
}
}
- private void _incrPendingResources(int containers, Resource res) {
- pendingContainers.incr(containers);
- pendingMB.incr(res.getMemorySize() * containers);
- pendingVCores.incr(res.getVirtualCores() * containers);
- if (queueMetricsForCustomResources != null) {
- queueMetricsForCustomResources.increasePending(res, containers);
- registerCustomResources(queueMetricsForCustomResources.getPendingValues(),
- PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
- }
- }
-
public void decrPendingResources(String partition, String user,
int containers, Resource res) {
+
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
- _decrPendingResources(containers, res);
- QueueMetrics userMetrics = getUserMetrics(user);
- if (userMetrics != null) {
- userMetrics.decrPendingResources(partition, user, containers, res);
- }
- if (parent != null) {
- parent.decrPendingResources(partition, user, containers, res);
+ internalDecrPendingResources(partition, user, containers, res);
+ }
+
+ QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+ if (partitionQueueMetrics != null) {
+ partitionQueueMetrics.internalDecrPendingResources(partition, user,
+ containers, res);
+ QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+ if (partitionMetrics != null) {
+ partitionMetrics.decrementPendingResources(containers, res);
}
}
}
- private void _decrPendingResources(int containers, Resource res) {
+ protected void internalDecrPendingResources(String partition, String user,
+ int containers, Resource res) {
+ decrementPendingResources(containers, res);
+ QueueMetrics userMetrics = getUserMetrics(user);
+ if (userMetrics != null) {
+ userMetrics.internalDecrPendingResources(partition, user, containers,
+ res);
+ }
+ if (parent != null) {
+ parent.internalDecrPendingResources(partition, user, containers, res);
+ }
+ }
+
+ private void decrementPendingResources(int containers, Resource res) {
pendingContainers.decr(containers);
pendingMB.decr(res.getMemorySize() * containers);
pendingVCores.decr(res.getVirtualCores() * containers);
@@ -560,35 +739,62 @@ public class QueueMetrics implements MetricsSource {
}
}
- public void allocateResources(String partition, String user,
- int containers, Resource res, boolean decrPending) {
+ public void allocateResources(String partition, String user, int containers,
+ Resource res, boolean decrPending) {
+
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
- allocatedContainers.incr(containers);
- aggregateContainersAllocated.incr(containers);
-
- allocatedMB.incr(res.getMemorySize() * containers);
- allocatedVCores.incr(res.getVirtualCores() * containers);
- if (queueMetricsForCustomResources != null) {
- queueMetricsForCustomResources.increaseAllocated(res, containers);
- registerCustomResources(
- queueMetricsForCustomResources.getAllocatedValues(),
- ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
- }
+ internalAllocateResources(partition, user, containers, res, decrPending);
+ }
- if (decrPending) {
- _decrPendingResources(containers, res);
- }
- QueueMetrics userMetrics = getUserMetrics(user);
- if (userMetrics != null) {
- userMetrics.allocateResources(partition, user,
- containers, res, decrPending);
- }
- if (parent != null) {
- parent.allocateResources(partition, user, containers, res, decrPending);
+ QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+ if (partitionQueueMetrics != null) {
+ partitionQueueMetrics.internalAllocateResources(partition, user,
+ containers, res, decrPending);
+ QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+ if (partitionMetrics != null) {
+ partitionMetrics.computeAllocateResources(containers, res, decrPending);
}
}
}
+ public void internalAllocateResources(String partition, String user,
+ int containers, Resource res, boolean decrPending) {
+ computeAllocateResources(containers, res, decrPending);
+ QueueMetrics userMetrics = getUserMetrics(user);
+ if (userMetrics != null) {
+ userMetrics.internalAllocateResources(partition, user, containers, res,
+ decrPending);
+ }
+ if (parent != null) {
+ parent.internalAllocateResources(partition, user, containers, res,
+ decrPending);
+ }
+ }
+
+ /**
+ * Allocate Resources for a partition with support for resource vectors.
+ *
+ * @param containers number of containers
+ * @param res resource containing memory size, vcores etc
+ * @param decrPending decides whether to decrease pending resource or not
+ */
+ private void computeAllocateResources(int containers, Resource res,
+ boolean decrPending) {
+ allocatedContainers.incr(containers);
+ aggregateContainersAllocated.incr(containers);
+ allocatedMB.incr(res.getMemorySize() * containers);
+ allocatedVCores.incr(res.getVirtualCores() * containers);
+ if (queueMetricsForCustomResources != null) {
+ queueMetricsForCustomResources.increaseAllocated(res, containers);
+ registerCustomResources(
+ queueMetricsForCustomResources.getAllocatedValues(),
+ ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
+ }
+ if (decrPending) {
+ decrementPendingResources(containers, res);
+ }
+ }
+
/**
* Allocate Resource for container size change.
* @param partition Node Partition
@@ -596,82 +802,81 @@ public class QueueMetrics implements MetricsSource {
* @param res
*/
public void allocateResources(String partition, String user, Resource res) {
- if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
- allocatedMB.incr(res.getMemorySize());
- allocatedVCores.incr(res.getVirtualCores());
- if (queueMetricsForCustomResources != null) {
- queueMetricsForCustomResources.increaseAllocated(res);
- registerCustomResources(
- queueMetricsForCustomResources.getAllocatedValues(),
- ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
- }
+ allocatedMB.incr(res.getMemorySize());
+ allocatedVCores.incr(res.getVirtualCores());
+ if (queueMetricsForCustomResources != null) {
+ queueMetricsForCustomResources.increaseAllocated(res);
+ registerCustomResources(
+ queueMetricsForCustomResources.getAllocatedValues(),
+ ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
+ }
- pendingMB.decr(res.getMemorySize());
- pendingVCores.decr(res.getVirtualCores());
- if (queueMetricsForCustomResources != null) {
- queueMetricsForCustomResources.decreasePending(res);
- registerCustomResources(
- queueMetricsForCustomResources.getPendingValues(),
- PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
- }
+ pendingMB.decr(res.getMemorySize());
+ pendingVCores.decr(res.getVirtualCores());
+ if (queueMetricsForCustomResources != null) {
+ queueMetricsForCustomResources.decreasePending(res);
+ registerCustomResources(
+ queueMetricsForCustomResources.getPendingValues(),
+ PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
+ }
- QueueMetrics userMetrics = getUserMetrics(user);
- if (userMetrics != null) {
- userMetrics.allocateResources(partition, user, res);
- }
- if (parent != null) {
- parent.allocateResources(partition, user, res);
- }
+ QueueMetrics userMetrics = getUserMetrics(user);
+ if (userMetrics != null) {
+ userMetrics.allocateResources(partition, user, res);
+ }
+ if (parent != null) {
+ parent.allocateResources(partition, user, res);
}
}
- public void releaseResources(String partition,
- String user, int containers, Resource res) {
+ public void releaseResources(String partition, String user, int containers,
+ Resource res) {
+
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
- allocatedContainers.decr(containers);
- aggregateContainersReleased.incr(containers);
- allocatedMB.decr(res.getMemorySize() * containers);
- allocatedVCores.decr(res.getVirtualCores() * containers);
- if (queueMetricsForCustomResources != null) {
- queueMetricsForCustomResources.decreaseAllocated(res, containers);
- registerCustomResources(
- queueMetricsForCustomResources.getAllocatedValues(),
- ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
- }
+ internalReleaseResources(partition, user, containers, res);
+ }
- QueueMetrics userMetrics = getUserMetrics(user);
- if (userMetrics != null) {
- userMetrics.releaseResources(partition, user, containers, res);
- }
- if (parent != null) {
- parent.releaseResources(partition, user, containers, res);
+ QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+ if (partitionQueueMetrics != null) {
+ partitionQueueMetrics.internalReleaseResources(partition, user,
+ containers, res);
+ QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+ if (partitionMetrics != null) {
+ partitionMetrics.computeReleaseResources(containers, res);
}
}
}
+ public void internalReleaseResources(String partition, String user,
+ int containers, Resource res) {
+
+ computeReleaseResources(containers, res);
+ QueueMetrics userMetrics = getUserMetrics(user);
+ if (userMetrics != null) {
+ userMetrics.internalReleaseResources(partition, user, containers, res);
+ }
+ if (parent != null) {
+ parent.internalReleaseResources(partition, user, containers, res);
+ }
+ }
+
/**
- * Release Resource for container size change.
+ * Release Resources for a partition with support for resource vectors.
*
- * @param user
- * @param res
+ * @param containers number of containers
+ * @param res resource containing memory size, vcores etc
*/
- private void releaseResources(String user, Resource res) {
- allocatedMB.decr(res.getMemorySize());
- allocatedVCores.decr(res.getVirtualCores());
+ private void computeReleaseResources(int containers, Resource res) {
+ allocatedContainers.decr(containers);
+ aggregateContainersReleased.incr(containers);
+ allocatedMB.decr(res.getMemorySize() * containers);
+ allocatedVCores.decr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
- queueMetricsForCustomResources.decreaseAllocated(res);
+ queueMetricsForCustomResources.decreaseAllocated(res, containers);
registerCustomResources(
queueMetricsForCustomResources.getAllocatedValues(),
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
}
-
- QueueMetrics userMetrics = getUserMetrics(user);
- if (userMetrics != null) {
- userMetrics.releaseResources(user, res);
- }
- if (parent != null) {
- parent.releaseResources(user, res);
- }
}
public void preemptContainer() {
@@ -721,11 +926,31 @@ public class QueueMetrics implements MetricsSource {
public void reserveResource(String partition, String user, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
- reserveResource(user, res);
+ internalReserveResources(partition, user, res);
+ }
+ QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+ if (partitionQueueMetrics != null) {
+ partitionQueueMetrics.internalReserveResources(partition, user, res);
+ QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+ if (partitionMetrics != null) {
+ partitionMetrics.incrReserveResources(res);
+ }
}
}
- public void reserveResource(String user, Resource res) {
+ protected void internalReserveResources(String partition, String user,
+ Resource res) {
+ incrReserveResources(res);
+ QueueMetrics userMetrics = getUserMetrics(user);
+ if (userMetrics != null) {
+ userMetrics.internalReserveResources(partition, user, res);
+ }
+ if (parent != null) {
+ parent.internalReserveResources(partition, user, res);
+ }
+ }
+
+ public void incrReserveResources(Resource res) {
reservedContainers.incr();
reservedMB.incr(res.getMemorySize());
reservedVCores.incr(res.getVirtualCores());
@@ -735,17 +960,37 @@ public class QueueMetrics implements MetricsSource {
queueMetricsForCustomResources.getReservedValues(),
RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
}
+ }
+
+ public void unreserveResource(String partition, String user, Resource res) {
+ if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+ internalUnReserveResources(partition, user, res);
+ }
+ QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+ if (partitionQueueMetrics != null) {
+ partitionQueueMetrics.internalUnReserveResources(partition, user, res);
+ QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+ if (partitionMetrics != null) {
+ partitionMetrics.decrReserveResource(res);
+ }
+ }
+ }
+
+ protected void internalUnReserveResources(String partition, String user,
+ Resource res) {
+ decrReserveResource(res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
- userMetrics.reserveResource(user, res);
+ userMetrics.internalUnReserveResources(partition, user, res);
}
if (parent != null) {
- parent.reserveResource(user, res);
+ parent.internalUnReserveResources(partition, user, res);
}
}
- private void unreserveResource(String user, Resource res) {
- reservedContainers.decr();
+ public void decrReserveResource(Resource res) {
+ int containers = 1;
+ reservedContainers.decr(containers);
reservedMB.decr(res.getMemorySize());
reservedVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
@@ -754,21 +999,8 @@ public class QueueMetrics implements MetricsSource {
queueMetricsForCustomResources.getReservedValues(),
RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
}
- QueueMetrics userMetrics = getUserMetrics(user);
- if (userMetrics != null) {
- userMetrics.unreserveResource(user, res);
- }
- if (parent != null) {
- parent.unreserveResource(user, res);
- }
- }
-
- public void unreserveResource(String partition, String user, Resource res) {
- if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
- unreserveResource(user, res);
- }
}
-
+
public void incrActiveUsers() {
activeUsers.incr();
}
@@ -980,4 +1212,12 @@ public class QueueMetrics implements MetricsSource {
QueueMetricsForCustomResources metrics) {
this.queueMetricsForCustomResources = metrics;
}
+
+ public void setParent(QueueMetrics parent) {
+ this.parent = parent;
+ }
+
+ public Queue getParentQueue() {
+ return parentQueue;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 0685f5d..d1e7c5b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -206,9 +206,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
RMContext rmContext) {
Preconditions.checkNotNull(rmContext, "RMContext should not be null");
this.rmContext = rmContext;
- this.appSchedulingInfo =
- new AppSchedulingInfo(applicationAttemptId, user, queue,
- abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage);
+ this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user,
+ queue, abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage,
+ this.rmContext);
this.queue = queue;
this.pendingRelease = Collections.newSetFromMap(
new ConcurrentHashMap<ContainerId, Boolean>());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java
index e9a0aaf..106f565 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java
@@ -229,7 +229,7 @@ public class CSQueueMetrics extends QueueMetrics {
public synchronized static CSQueueMetrics forQueue(String queueName,
Queue parent, boolean enableUserMetrics, Configuration conf) {
MetricsSystem ms = DefaultMetricsSystem.instance();
- QueueMetrics metrics = QueueMetrics.getQueueMetrics().get(queueName);
+ QueueMetrics metrics = getQueueMetrics().get(queueName);
if (metrics == null) {
metrics =
new CSQueueMetrics(ms, queueName, parent, enableUserMetrics, conf)
@@ -241,7 +241,7 @@ public class CSQueueMetrics extends QueueMetrics {
ms.register(sourceName(queueName).toString(), "Metrics for queue: "
+ queueName, metrics);
}
- QueueMetrics.getQueueMetrics().put(queueName, metrics);
+ getQueueMetrics().put(queueName, metrics);
}
return (CSQueueMetrics) metrics;
@@ -254,7 +254,8 @@ public class CSQueueMetrics extends QueueMetrics {
}
CSQueueMetrics metrics = (CSQueueMetrics) users.get(userName);
if (metrics == null) {
- metrics = new CSQueueMetrics(metricsSystem, queueName, null, false, conf);
+ metrics =
+ new CSQueueMetrics(metricsSystem, queueName, null, false, conf);
users.put(userName, metrics);
metricsSystem.register(
sourceName(queueName).append(",user=").append(userName).toString(),
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 3cbebd8..f1ec200 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -1370,8 +1370,9 @@ public class LeafQueue extends AbstractCSQueue {
: getQueueMaxResource(partition, clusterResource);
Resource headroom = Resources.componentwiseMin(
- Resources.subtract(userLimitResource, user.getUsed(partition)),
- Resources.subtract(currentPartitionResourceLimit,
+ Resources.subtractNonNegative(userLimitResource,
+ user.getUsed(partition)),
+ Resources.subtractNonNegative(currentPartitionResourceLimit,
queueUsage.getUsed(partition)));
// Normalize it before return
headroom =
@@ -1685,12 +1686,17 @@ public class LeafQueue extends AbstractCSQueue {
User user = usersManager.updateUserResourceUsage(userName, resource,
nodePartition, true);
- // Note this is a bit unconventional since it gets the object and modifies
- // it here, rather then using set routine
- Resources.subtractFrom(application.getHeadroom(), resource); // headroom
- metrics.setAvailableResourcesToUser(nodePartition,
- userName, application.getHeadroom());
-
+ Resource partitionHeadroom = Resources.createResource(0, 0);
+ if (metrics.getUserMetrics(userName) != null) {
+ partitionHeadroom = getHeadroom(user,
+ cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
+ getResourceLimitForActiveUsers(userName, clusterResource,
+ nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
+ nodePartition);
+ }
+ metrics.setAvailableResourcesToUser(nodePartition, userName,
+ partitionHeadroom);
+
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName() + " user=" + userName + " used="
+ queueUsage.getUsed(nodePartition) + " numContainers="
@@ -1728,8 +1734,16 @@ public class LeafQueue extends AbstractCSQueue {
User user = usersManager.updateUserResourceUsage(userName, resource,
nodePartition, false);
- metrics.setAvailableResourcesToUser(nodePartition,
- userName, application.getHeadroom());
+ Resource partitionHeadroom = Resources.createResource(0, 0);
+ if (metrics.getUserMetrics(userName) != null) {
+ partitionHeadroom = getHeadroom(user,
+ cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
+ getResourceLimitForActiveUsers(userName, clusterResource,
+ nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
+ nodePartition);
+ }
+ metrics.setAvailableResourcesToUser(nodePartition, userName,
+ partitionHeadroom);
if (LOG.isDebugEnabled()) {
LOG.debug(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 6f37623..ffe2328 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -549,8 +549,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
List<ResourceRequest> requests = appSchedulingInfo.allocate(
allocation.getAllocationLocalityType(),
schedulerContainer.getSchedulerNode(),
- schedulerContainer.getSchedulerRequestKey(),
- schedulerContainer.getRmContainer().getContainer());
+ schedulerContainer.getSchedulerRequestKey(), rmContainer);
((RMContainerImpl) rmContainer).setResourceRequests(requests);
attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(),
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 5897914..6e63c87 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -469,7 +469,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
- type, node, schedulerKey, container);
+ type, node, schedulerKey, rmContainer);
this.attemptResourceUsage.incUsed(container.getResource());
getQueue().incUsedResource(container.getResource());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
index 230f66f..e320f2d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
@@ -81,7 +81,7 @@ public class FifoAppAttempt extends FiCaSchedulerApp {
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
- type, node, schedulerKey, container);
+ type, node, schedulerKey, rmContainer);
attemptResourceUsage.incUsed(node.getPartition(),
container.getResource());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
index bb29889..3afa0ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@@ -47,8 +48,9 @@ public class TestAppSchedulingInfo {
FSLeafQueue queue = mock(FSLeafQueue.class);
doReturn("test").when(queue).getQueueName();
+ RMContext rmContext = mock(RMContext.class);
AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
- appAttemptId, "test", queue, null, 0, new ResourceUsage());
+ appAttemptId, "test", queue, null, 0, new ResourceUsage(), rmContext);
appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList<String>(),
new ArrayList<String>());
@@ -118,9 +120,10 @@ public class TestAppSchedulingInfo {
Queue queue = mock(Queue.class);
doReturn(mock(QueueMetrics.class)).when(queue).getMetrics();
+ RMContext rmContext = mock(RMContext.class);
AppSchedulingInfo info = new AppSchedulingInfo(
appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0,
- new ResourceUsage());
+ new ResourceUsage(), rmContext);
Assert.assertEquals(0, info.getSchedulerKeys().size());
Priority pri1 = Priority.newInstance(1);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java
new file mode 100644
index 0000000..eb240d1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java
@@ -0,0 +1,752 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueMetrics;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPartitionQueueMetrics {
+
+ static final int GB = 1024; // MB
+ private static final Configuration CONF = new Configuration();
+
+ private MetricsSystem ms;
+
+ @Before
+ public void setUp() {
+ ms = new MetricsSystemImpl();
+ QueueMetrics.clearQueueMetrics();
+ PartitionQueueMetrics.clearQueueMetrics();
+ }
+
+ @After
+ public void tearDown() {
+ ms.shutdown();
+ }
+
+ /**
+ * Structure:
+ * Both queues, q1 & q2 has been configured to run in only 1 partition, x.
+ *
+ * root
+ * / \
+ * q1 q2
+ *
+ * @throws Exception
+ */
+
+ @Test
+ public void testSinglePartitionWithSingleLevelQueueMetrics()
+ throws Exception {
+
+ String parentQueueName = "root";
+ Queue parentQueue = mock(Queue.class);
+ String user = "alice";
+
+ QueueMetrics root = QueueMetrics.forQueue(ms, "root", null, true, CONF);
+ when(parentQueue.getMetrics()).thenReturn(root);
+ when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+ QueueMetrics q1 =
+ QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF);
+ QueueMetrics q2 =
+ QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, CONF);
+
+ q1.submitApp(user);
+ q1.submitAppAttempt(user);
+
+ root.setAvailableResourcesToQueue("x",
+ Resources.createResource(200 * GB, 200));
+ q1.setAvailableResourcesToQueue("x",
+ Resources.createResource(100 * GB, 100));
+
+ q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1));
+
+ MetricsSource partitionSource = partitionSource(ms, "x");
+ MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName);
+ MetricsSource q1Source = queueSource(ms, "x", "root.q1");
+
+ checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+ checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+ checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
+
+ q2.incrPendingResources("x", user, 3, Resource.newInstance(1024, 1));
+ MetricsSource q2Source = queueSource(ms, "x", "root.q2");
+
+ checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
+ checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
+ checkResources(q2Source, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
+ }
+
+ /**
+ * Structure:
+ * Both queues, q1 & q2 has been configured to run in both partitions, x & y.
+ *
+ * root
+ * / \
+ * q1 q2
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testTwoPartitionWithSingleLevelQueueMetrics() throws Exception {
+
+ String parentQueueName = "root";
+ String user = "alice";
+
+ QueueMetrics root =
+ QueueMetrics.forQueue(ms, parentQueueName, null, false, CONF);
+ Queue parentQueue = mock(Queue.class);
+ when(parentQueue.getMetrics()).thenReturn(root);
+ when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+
+ QueueMetrics q1 =
+ QueueMetrics.forQueue(ms, "root.q1", parentQueue, false, CONF);
+ QueueMetrics q2 =
+ QueueMetrics.forQueue(ms, "root.q2", parentQueue, false, CONF);
+
+ AppSchedulingInfo app = mockApp(user);
+ q1.submitApp(user);
+ q1.submitAppAttempt(user);
+
+ root.setAvailableResourcesToQueue("x",
+ Resources.createResource(200 * GB, 200));
+ q1.setAvailableResourcesToQueue("x",
+ Resources.createResource(100 * GB, 100));
+
+ q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1));
+
+ MetricsSource xPartitionSource = partitionSource(ms, "x");
+ MetricsSource xRootQueueSource = queueSource(ms, "x", parentQueueName);
+ MetricsSource q1Source = queueSource(ms, "x", "root.q1");
+
+ checkResources(xPartitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+ checkResources(xRootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+ checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
+
+ root.setAvailableResourcesToQueue("y",
+ Resources.createResource(400 * GB, 400));
+ q2.setAvailableResourcesToQueue("y",
+ Resources.createResource(200 * GB, 200));
+
+ q2.incrPendingResources("y", user, 3, Resource.newInstance(1024, 1));
+
+ MetricsSource yPartitionSource = partitionSource(ms, "y");
+ MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName);
+ MetricsSource q2Source = queueSource(ms, "y", "root.q2");
+
+ checkResources(yPartitionSource, 0, 0, 0, 400 * GB, 400, 3 * GB, 3, 3);
+ checkResources(yRootQueueSource, 0, 0, 0, 400 * GB, 400, 3 * GB, 3, 3);
+ checkResources(q2Source, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3);
+ }
+
+ /**
+ * Structure:
+ * Both queues, q1 has been configured to run in multiple partitions, x & y.
+ *
+ * root
+ * /
+ * q1
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMultiplePartitionWithSingleQueueMetrics() throws Exception {
+
+ String parentQueueName = "root";
+ Queue parentQueue = mock(Queue.class);
+
+ QueueMetrics root =
+ QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF);
+ when(parentQueue.getMetrics()).thenReturn(root);
+ when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+
+ QueueMetrics q1 =
+ QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF);
+
+ root.setAvailableResourcesToQueue("x",
+ Resources.createResource(200 * GB, 200));
+ root.setAvailableResourcesToQueue("y",
+ Resources.createResource(300 * GB, 300));
+
+ q1.incrPendingResources("x", "test_user", 2, Resource.newInstance(1024, 1));
+
+ MetricsSource partitionSource = partitionSource(ms, "x");
+ MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName);
+ MetricsSource q1Source = queueSource(ms, "x", "root.q1");
+ MetricsSource userSource = userSource(ms, "x", "test_user", "root.q1");
+
+ checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+ checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+ checkResources(q1Source, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
+ checkResources(userSource, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
+
+ q1.incrPendingResources("x", "test_user", 3, Resource.newInstance(1024, 1));
+
+ checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
+ checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
+ checkResources(q1Source, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
+ checkResources(userSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
+
+ q1.incrPendingResources("x", "test_user1", 4,
+ Resource.newInstance(1024, 1));
+ MetricsSource userSource1 = userSource(ms, "x", "test_user1", "root.q1");
+
+ checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 9 * GB, 9, 9);
+ checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 9 * GB, 9, 9);
+ checkResources(q1Source, 0, 0, 0, 0, 0, 9 * GB, 9, 9);
+ checkResources(userSource1, 0, 0, 0, 0, 0, 4 * GB, 4, 4);
+
+ q1.incrPendingResources("y", "test_user1", 6,
+ Resource.newInstance(1024, 1));
+ MetricsSource partitionSourceY = partitionSource(ms, "y");
+ MetricsSource rootQueueSourceY = queueSource(ms, "y", parentQueueName);
+ MetricsSource q1SourceY = queueSource(ms, "y", "root.q1");
+ MetricsSource userSourceY = userSource(ms, "y", "test_user1", "root.q1");
+
+ checkResources(partitionSourceY, 0, 0, 0, 300 * GB, 300, 6 * GB, 6, 6);
+ checkResources(rootQueueSourceY, 0, 0, 0, 300 * GB, 300, 6 * GB, 6, 6);
+ checkResources(q1SourceY, 0, 0, 0, 0, 0, 6 * GB, 6, 6);
+ checkResources(userSourceY, 0, 0, 0, 0, 0, 6 * GB, 6, 6);
+ }
+
+ /**
+ * Structure:
+ * Both queues, q1 & q2 has been configured to run in both partitions, x & y.
+ *
+ * root
+ * / \
+ * q1 q2
+ * q1
+ * / \
+ * q11 q12
+ * q2
+ * / \
+ * q21 q22
+ *
+ * @throws Exception
+ */
+
+ @Test
+ public void testMultiplePartitionsWithMultiLevelQueuesMetrics()
+ throws Exception {
+
+ String parentQueueName = "root";
+ Queue parentQueue = mock(Queue.class);
+
+ QueueMetrics root =
+ QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF);
+ when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+ when(parentQueue.getMetrics()).thenReturn(root);
+
+ QueueMetrics q1 =
+ QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF);
+ Queue childQueue1 = mock(Queue.class);
+ when(childQueue1.getQueueName()).thenReturn("root.q1");
+ when(childQueue1.getMetrics()).thenReturn(q1);
+
+ QueueMetrics q11 =
+ QueueMetrics.forQueue(ms, "root.q1.q11", childQueue1, true, CONF);
+ QueueMetrics q12 =
+ QueueMetrics.forQueue(ms, "root.q1.q12", childQueue1, true, CONF);
+
+ QueueMetrics q2 =
+ QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, CONF);
+ Queue childQueue2 = mock(Queue.class);
+ when(childQueue2.getQueueName()).thenReturn("root.q2");
+ when(childQueue2.getMetrics()).thenReturn(q2);
+
+ QueueMetrics q21 =
+ QueueMetrics.forQueue(ms, "root.q2.q21", childQueue2, true, CONF);
+ QueueMetrics q22 =
+ QueueMetrics.forQueue(ms, "root.q2.q22", childQueue2, true, CONF);
+
+ root.setAvailableResourcesToQueue("x",
+ Resources.createResource(200 * GB, 200));
+
+ q1.setAvailableResourcesToQueue("x",
+ Resources.createResource(100 * GB, 100));
+ q11.setAvailableResourcesToQueue("x",
+ Resources.createResource(50 * GB, 50));
+
+ q11.incrPendingResources("x", "test_user", 2,
+ Resource.newInstance(1024, 1));
+
+ MetricsSource partitionSource = partitionSource(ms, "x");
+ MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName);
+ MetricsSource q1Source = queueSource(ms, "x", "root.q1");
+ MetricsSource userSource = userSource(ms, "x", "test_user", "root.q1");
+
+ checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+ checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
+ checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+ checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
+ checkResources(userSource, 0, 0, 0, 0 * GB, 0, 2 * GB, 2, 2);
+
+ q11.incrPendingResources("x", "test_user", 4,
+ Resource.newInstance(1024, 1));
+
+ MetricsSource q11Source = queueSource(ms, "x", "root.q1.q11");
+
+ checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 6 * GB, 6, 6);
+ checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 6 * GB, 6, 6);
+ checkResources(q11Source, 0, 0, 0, 50 * GB, 50, 6 * GB, 6, 6);
+ checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 6 * GB, 6, 6);
+ checkResources(userSource, 0, 0, 0, 0 * GB, 0, 6 * GB, 6, 6);
+
+ q11.incrPendingResources("x", "test_user1", 5,
+ Resource.newInstance(1024, 1));
+
+ MetricsSource q1UserSource1 = userSource(ms, "x", "test_user1", "root.q1");
+ MetricsSource userSource1 =
+ userSource(ms, "x", "test_user1", "root.q1.q11");
+
+ checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 11 * GB, 11, 11);
+ checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 11 * GB, 11, 11);
+ checkResources(q11Source, 0, 0, 0, 50 * GB, 50, 11 * GB, 11, 11);
+ checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 11 * GB, 11, 11);
+ checkResources(userSource, 0, 0, 0, 0 * GB, 0, 6 * GB, 6, 6);
+ checkResources(q1UserSource1, 0, 0, 0, 0 * GB, 0, 5 * GB, 5, 5);
+ checkResources(userSource1, 0, 0, 0, 0 * GB, 0, 5 * GB, 5, 5);
+
+ q12.incrPendingResources("x", "test_user", 5,
+ Resource.newInstance(1024, 1));
+ MetricsSource q12Source = queueSource(ms, "x", "root.q1.q12");
+
+ checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 16 * GB, 16, 16);
+ checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 16 * GB, 16, 16);
+ checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 16 * GB, 16, 16);
+ checkResources(q12Source, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
+
+ root.setAvailableResourcesToQueue("y",
+ Resources.createResource(200 * GB, 200));
+ q1.setAvailableResourcesToQueue("y",
+ Resources.createResource(100 * GB, 100));
+ q12.setAvailableResourcesToQueue("y",
+ Resources.createResource(50 * GB, 50));
+
+ q12.incrPendingResources("y", "test_user", 3,
+ Resource.newInstance(1024, 1));
+
+ MetricsSource yPartitionSource = partitionSource(ms, "y");
+ MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName);
+ MetricsSource q1YSource = queueSource(ms, "y", "root.q1");
+ MetricsSource q12YSource = queueSource(ms, "y", "root.q1.q12");
+
+ checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3);
+ checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3);
+ checkResources(q1YSource, 0, 0, 0, 100 * GB, 100, 3 * GB, 3, 3);
+ checkResources(q12YSource, 0, 0, 0, 50 * GB, 50, 3 * GB, 3, 3);
+
+ root.setAvailableResourcesToQueue("y",
+ Resources.createResource(200 * GB, 200));
+ q2.setAvailableResourcesToQueue("y",
+ Resources.createResource(100 * GB, 100));
+ q21.setAvailableResourcesToQueue("y",
+ Resources.createResource(50 * GB, 50));
+
+ q21.incrPendingResources("y", "test_user", 5,
+ Resource.newInstance(1024, 1));
+ MetricsSource q21Source = queueSource(ms, "y", "root.q2.q21");
+ MetricsSource q2YSource = queueSource(ms, "y", "root.q2");
+
+ checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 8 * GB, 8, 8);
+ checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 8 * GB, 8, 8);
+ checkResources(q2YSource, 0, 0, 0, 100 * GB, 100, 5 * GB, 5, 5);
+ checkResources(q21Source, 0, 0, 0, 50 * GB, 50, 5 * GB, 5, 5);
+
+ q22.incrPendingResources("y", "test_user", 6,
+ Resource.newInstance(1024, 1));
+ MetricsSource q22Source = queueSource(ms, "y", "root.q2.q22");
+
+ checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 14 * GB, 14, 14);
+ checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 14 * GB, 14, 14);
+ checkResources(q22Source, 0, 0, 0, 0, 0, 6 * GB, 6, 6);
+ }
+
+ @Test
+ public void testTwoLevelWithUserMetrics() {
+ String parentQueueName = "root";
+ String leafQueueName = "root.leaf";
+ String user = "alice";
+ String partition = "x";
+
+ QueueMetrics parentMetrics =
+ QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF);
+ Queue parentQueue = mock(Queue.class);
+ when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+ when(parentQueue.getMetrics()).thenReturn(parentMetrics);
+ QueueMetrics metrics =
+ QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, CONF);
+ AppSchedulingInfo app = mockApp(user);
+
+ metrics.submitApp(user);
+ metrics.submitAppAttempt(user);
+
+ parentMetrics.setAvailableResourcesToQueue(partition,
+ Resources.createResource(100 * GB, 100));
+ metrics.setAvailableResourcesToQueue(partition,
+ Resources.createResource(100 * GB, 100));
+ parentMetrics.setAvailableResourcesToUser(partition, user,
+ Resources.createResource(10 * GB, 10));
+ metrics.setAvailableResourcesToUser(partition, user,
+ Resources.createResource(10 * GB, 10));
+ metrics.incrPendingResources(partition, user, 6,
+ Resources.createResource(3 * GB, 3));
+
+ MetricsSource partitionSource = partitionSource(ms, partition);
+ MetricsSource parentQueueSource =
+ queueSource(ms, partition, parentQueueName);
+ MetricsSource queueSource = queueSource(ms, partition, leafQueueName);
+ MetricsSource userSource = userSource(ms, partition, user, leafQueueName);
+ MetricsSource userSource1 =
+ userSource(ms, partition, user, parentQueueName);
+
+ checkResources(queueSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18, 6, 0,
+ 0, 0);
+ checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18,
+ 6, 0, 0, 0);
+ checkResources(userSource, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0, 0,
+ 0);
+ checkResources(userSource1, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0,
+ 0, 0);
+ checkResources(partitionSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18,
+ 6, 0, 0, 0);
+
+ metrics.runAppAttempt(app.getApplicationId(), user);
+
+ metrics.allocateResources(partition, user, 3,
+ Resources.createResource(1 * GB, 1), true);
+ metrics.reserveResource(partition, user,
+ Resources.createResource(3 * GB, 3));
+
+ // Available resources is set externally, as it depends on dynamic
+ // configurable cluster/queue resources
+ checkResources(queueSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100, 15 * GB, 15,
+ 3, 3 * GB, 3, 1);
+ checkResources(parentQueueSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100,
+ 15 * GB, 15, 3, 3 * GB, 3, 1);
+ checkResources(partitionSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100, 15 * GB,
+ 15, 3, 3 * GB, 3, 1);
+ checkResources(userSource, 3 * GB, 3, 3, 3, 0, 10 * GB, 10, 15 * GB, 15, 3,
+ 3 * GB, 3, 1);
+ checkResources(userSource1, 3 * GB, 3, 3, 3, 0, 10 * GB, 10, 15 * GB, 15, 3,
+ 3 * GB, 3, 1);
+
+ metrics.allocateResources(partition, user, 3,
+ Resources.createResource(1 * GB, 1), true);
+
+ checkResources(queueSource, 6 * GB, 6, 6, 6, 0, 100 * GB, 100, 12 * GB, 12,
+ 0, 3 * GB, 3, 1);
+ checkResources(parentQueueSource, 6 * GB, 6, 6, 6, 0, 100 * GB, 100,
+ 12 * GB, 12, 0, 3 * GB, 3, 1);
+
+ metrics.releaseResources(partition, user, 1,
+ Resources.createResource(2 * GB, 2));
+ metrics.unreserveResource(partition, user,
+ Resources.createResource(3 * GB, 3));
+ checkResources(queueSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100, 12 * GB, 12,
+ 0, 0, 0, 0);
+ checkResources(parentQueueSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100,
+ 12 * GB, 12, 0, 0, 0, 0);
+ checkResources(partitionSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100, 12 * GB,
+ 12, 0, 0, 0, 0);
+ checkResources(userSource, 4 * GB, 4, 5, 6, 1, 10 * GB, 10, 12 * GB, 12, 0,
+ 0, 0, 0);
+ checkResources(userSource1, 4 * GB, 4, 5, 6, 1, 10 * GB, 10, 12 * GB, 12, 0,
+ 0, 0, 0);
+
+ metrics.finishAppAttempt(app.getApplicationId(), app.isPending(),
+ app.getUser());
+
+ metrics.finishApp(user, RMAppState.FINISHED);
+ }
+
+ @Test
+ public void testThreeLevelWithUserMetrics() {
+ String parentQueueName = "root";
+ String leafQueueName = "root.leaf";
+ String leafQueueName1 = "root.leaf.leaf1";
+ String user = "alice";
+ String partitionX = "x";
+ String partitionY = "y";
+
+ QueueMetrics parentMetrics =
+ QueueMetrics.forQueue(parentQueueName, null, true, CONF);
+ Queue parentQueue = mock(Queue.class);
+ when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+ when(parentQueue.getMetrics()).thenReturn(parentMetrics);
+ QueueMetrics metrics =
+ QueueMetrics.forQueue(leafQueueName, parentQueue, true, CONF);
+ Queue leafQueue = mock(Queue.class);
+ when(leafQueue.getQueueName()).thenReturn(leafQueueName);
+ when(leafQueue.getMetrics()).thenReturn(metrics);
+ QueueMetrics metrics1 =
+ QueueMetrics.forQueue(leafQueueName1, leafQueue, true, CONF);
+ AppSchedulingInfo app = mockApp(user);
+
+ metrics1.submitApp(user);
+ metrics1.submitAppAttempt(user);
+
+ parentMetrics.setAvailableResourcesToQueue(partitionX,
+ Resources.createResource(200 * GB, 200));
+ parentMetrics.setAvailableResourcesToQueue(partitionY,
+ Resources.createResource(500 * GB, 500));
+ metrics.setAvailableResourcesToQueue(partitionX,
+ Resources.createResource(100 * GB, 100));
+ metrics.setAvailableResourcesToQueue(partitionY,
+ Resources.createResource(400 * GB, 400));
+ metrics1.setAvailableResourcesToQueue(partitionX,
+ Resources.createResource(50 * GB, 50));
+ metrics1.setAvailableResourcesToQueue(partitionY,
+ Resources.createResource(300 * GB, 300));
+ parentMetrics.setAvailableResourcesToUser(partitionX, user,
+ Resources.createResource(20 * GB, 20));
+ parentMetrics.setAvailableResourcesToUser(partitionY, user,
+ Resources.createResource(50 * GB, 50));
+ metrics.setAvailableResourcesToUser(partitionX, user,
+ Resources.createResource(10 * GB, 10));
+ metrics.setAvailableResourcesToUser(partitionY, user,
+ Resources.createResource(40 * GB, 40));
+ metrics1.setAvailableResourcesToUser(partitionX, user,
+ Resources.createResource(5 * GB, 5));
+ metrics1.setAvailableResourcesToUser(partitionY, user,
+ Resources.createResource(30 * GB, 30));
+ metrics1.incrPendingResources(partitionX, user, 6,
+ Resources.createResource(3 * GB, 3));
+ metrics1.incrPendingResources(partitionY, user, 6,
+ Resources.createResource(4 * GB, 4));
+
+ MetricsSource partitionSourceX =
+ partitionSource(metrics1.getMetricsSystem(), partitionX);
+
+ MetricsSource parentQueueSourceWithPartX =
+ queueSource(metrics1.getMetricsSystem(), partitionX, parentQueueName);
+ MetricsSource queueSourceWithPartX =
+ queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName);
+ MetricsSource queueSource1WithPartX =
+ queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName1);
+ MetricsSource parentUserSourceWithPartX = userSource(metrics1.getMetricsSystem(),
+ partitionX, user, parentQueueName);
+ MetricsSource userSourceWithPartX = userSource(metrics1.getMetricsSystem(),
+ partitionX, user, leafQueueName);
+ MetricsSource userSource1WithPartX = userSource(metrics1.getMetricsSystem(),
+ partitionX, user, leafQueueName1);
+
+ checkResources(partitionSourceX, 0, 0, 0, 0, 0, 200 * GB, 200, 18 * GB, 18,
+ 6, 0, 0, 0);
+ checkResources(parentQueueSourceWithPartX, 0, 0, 0, 0, 0, 200 * GB, 200, 18 * GB,
+ 18, 6, 0, 0, 0);
+
+ checkResources(queueSourceWithPartX, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18, 6,
+ 0, 0, 0);
+ checkResources(queueSource1WithPartX, 0, 0, 0, 0, 0, 50 * GB, 50, 18 * GB, 18, 6,
+ 0, 0, 0);
+ checkResources(parentUserSourceWithPartX, 0, 0, 0, 0, 0, 20 * GB, 20, 18 * GB, 18,
+ 6, 0, 0, 0);
+ checkResources(userSourceWithPartX, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0,
+ 0, 0);
+ checkResources(userSource1WithPartX, 0, 0, 0, 0, 0, 5 * GB, 5, 18 * GB, 18, 6, 0,
+ 0, 0);
+
+ MetricsSource partitionSourceY =
+ partitionSource(metrics1.getMetricsSystem(), partitionY);
+
+ MetricsSource parentQueueSourceWithPartY =
+ queueSource(metrics1.getMetricsSystem(), partitionY, parentQueueName);
+ MetricsSource queueSourceWithPartY =
+ queueSource(metrics1.getMetricsSystem(), partitionY, leafQueueName);
+ MetricsSource queueSource1WithPartY =
+ queueSource(metrics1.getMetricsSystem(), partitionY, leafQueueName1);
+ MetricsSource parentUserSourceWithPartY = userSource(metrics1.getMetricsSystem(),
+ partitionY, user, parentQueueName);
+ MetricsSource userSourceWithPartY = userSource(metrics1.getMetricsSystem(),
+ partitionY, user, leafQueueName);
+ MetricsSource userSource1WithPartY = userSource(metrics1.getMetricsSystem(),
+ partitionY, user, leafQueueName1);
+
+ checkResources(partitionSourceY, 0, 0, 0, 0, 0, 500 * GB, 500, 24 * GB, 24,
+ 6, 0, 0, 0);
+ checkResources(parentQueueSourceWithPartY, 0, 0, 0, 0, 0, 500 * GB, 500, 24 * GB,
+ 24, 6, 0, 0, 0);
+ checkResources(queueSourceWithPartY, 0, 0, 0, 0, 0, 400 * GB, 400, 24 * GB, 24, 6,
+ 0, 0, 0);
+ checkResources(queueSource1WithPartY, 0, 0, 0, 0, 0, 300 * GB, 300, 24 * GB, 24, 6,
+ 0, 0, 0);
+ checkResources(parentUserSourceWithPartY, 0, 0, 0, 0, 0, 50 * GB, 50, 24 * GB, 24,
+ 6, 0, 0, 0);
+ checkResources(userSourceWithPartY, 0, 0, 0, 0, 0, 40 * GB, 40, 24 * GB, 24, 6, 0,
+ 0, 0);
+ checkResources(userSource1WithPartY, 0, 0, 0, 0, 0, 30 * GB, 30, 24 * GB, 24, 6, 0,
+ 0, 0);
+
+ metrics1.finishAppAttempt(app.getApplicationId(), app.isPending(),
+ app.getUser());
+
+ metrics1.finishApp(user, RMAppState.FINISHED);
+ }
+
+ /**
+ * Structure:
+ * Both queues, q1 & q2 has been configured to run in only 1 partition, x
+ * UserMetrics has been disabled, hence trying to access the user source
+ * throws NPE from sources.
+ *
+ * root
+ * / \
+ * q1 q2
+ *
+ * @throws Exception
+ */
+ @Test(expected = NullPointerException.class)
+ public void testSinglePartitionWithSingleLevelQueueMetricsWithoutUserMetrics()
+ throws Exception {
+
+ String parentQueueName = "root";
+ Queue parentQueue = mock(Queue.class);
+ String user = "alice";
+
+ QueueMetrics root = QueueMetrics.forQueue("root", null, false, CONF);
+ when(parentQueue.getMetrics()).thenReturn(root);
+ when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+ CSQueueMetrics q1 =
+ CSQueueMetrics.forQueue("root.q1", parentQueue, false, CONF);
+ CSQueueMetrics q2 =
+ CSQueueMetrics.forQueue("root.q2", parentQueue, false, CONF);
+
+ AppSchedulingInfo app = mockApp(user);
+
+ q1.submitApp(user);
+ q1.submitAppAttempt(user);
+
+ root.setAvailableResourcesToQueue("x",
+ Resources.createResource(200 * GB, 200));
+
+ q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1));
+
+ MetricsSource partitionSource = partitionSource(q1.getMetricsSystem(), "x");
+ MetricsSource rootQueueSource =
+ queueSource(q1.getMetricsSystem(), "x", parentQueueName);
+ MetricsSource q1Source = queueSource(q1.getMetricsSystem(), "x", "root.q1");
+ MetricsSource q1UserSource =
+ userSource(q1.getMetricsSystem(), "x", user, "root.q1");
+
+ checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+ checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+ checkResources(q1Source, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
+ checkResources(q1UserSource, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
+
+ q2.incrPendingResources("x", user, 3, Resource.newInstance(1024, 1));
+ MetricsSource q2Source = queueSource(q2.getMetricsSystem(), "x", "root.q2");
+ MetricsSource q2UserSource =
+ userSource(q1.getMetricsSystem(), "x", user, "root.q2");
+
+ checkResources(partitionSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
+ checkResources(rootQueueSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
+ checkResources(q2Source, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
+ checkResources(q2UserSource, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
+
+ q1.finishAppAttempt(app.getApplicationId(), app.isPending(), app.getUser());
+ q1.finishApp(user, RMAppState.FINISHED);
+ }
+
+ public static MetricsSource partitionSource(MetricsSystem ms,
+ String partition) {
+ MetricsSource s =
+ ms.getSource(QueueMetrics.pSourceName(partition).toString());
+ return s;
+ }
+
+ public static MetricsSource queueSource(MetricsSystem ms, String partition,
+ String queue) {
+ MetricsSource s = ms.getSource(QueueMetrics.pSourceName(partition)
+ .append(QueueMetrics.qSourceName(queue)).toString());
+ return s;
+ }
+
+ public static MetricsSource userSource(MetricsSystem ms, String partition,
+ String user, String queue) {
+ MetricsSource s = ms.getSource(QueueMetrics.pSourceName(partition)
+ .append(QueueMetrics.qSourceName(queue)).append(",user=")
+ .append(user).toString());
+ return s;
+ }
+
+ public static void checkResources(MetricsSource source, long allocatedMB,
+ int allocatedCores, int allocCtnrs, long availableMB, int availableCores,
+ long pendingMB, int pendingCores, int pendingCtnrs) {
+ MetricsRecordBuilder rb = getMetrics(source);
+ assertGauge("AllocatedMB", allocatedMB, rb);
+ assertGauge("AllocatedVCores", allocatedCores, rb);
+ assertGauge("AllocatedContainers", allocCtnrs, rb);
+ assertGauge("AvailableMB", availableMB, rb);
+ assertGauge("AvailableVCores", availableCores, rb);
+ assertGauge("PendingMB", pendingMB, rb);
+ assertGauge("PendingVCores", pendingCores, rb);
+ assertGauge("PendingContainers", pendingCtnrs, rb);
+ }
+
+ private static AppSchedulingInfo mockApp(String user) {
+ AppSchedulingInfo app = mock(AppSchedulingInfo.class);
+ when(app.getUser()).thenReturn(user);
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+ ApplicationAttemptId id = BuilderUtils.newApplicationAttemptId(appId, 1);
+ when(app.getApplicationAttemptId()).thenReturn(id);
+ return app;
+ }
+
+ public static void checkResources(MetricsSource source, long allocatedMB,
+ int allocatedCores, int allocCtnrs, long aggreAllocCtnrs,
+ long aggreReleasedCtnrs, long availableMB, int availableCores,
+ long pendingMB, int pendingCores, int pendingCtnrs, long reservedMB,
+ int reservedCores, int reservedCtnrs) {
+ MetricsRecordBuilder rb = getMetrics(source);
+ assertGauge("AllocatedMB", allocatedMB, rb);
+ assertGauge("AllocatedVCores", allocatedCores, rb);
+ assertGauge("AllocatedContainers", allocCtnrs, rb);
+ assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb);
+ assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb);
+ assertGauge("AvailableMB", availableMB, rb);
+ assertGauge("AvailableVCores", availableCores, rb);
+ assertGauge("PendingMB", pendingMB, rb);
+ assertGauge("PendingVCores", pendingCores, rb);
+ assertGauge("PendingContainers", pendingCtnrs, rb);
+ assertGauge("ReservedMB", reservedMB, rb);
+ assertGauge("ReservedVCores", reservedCores, rb);
+ assertGauge("ReservedContainers", reservedCtnrs, rb);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
index c110b1c..25062b9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
@@ -93,7 +93,7 @@ public class TestSchedulerApplicationAttempt {
app.liveContainers.put(container1.getContainerId(), container1);
SchedulerNode node = createNode();
app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node,
- toSchedulerKey(requestedPriority), container1.getContainer());
+ toSchedulerKey(requestedPriority), container1);
// Active user count has to decrease from queue2 due to app has NO pending requests
assertEquals(0, queue2.getAbstractUsersManager().getNumActiveUsers());
@@ -135,7 +135,7 @@ public class TestSchedulerApplicationAttempt {
app.liveContainers.put(container1.getContainerId(), container1);
SchedulerNode node = createNode();
app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node,
- toSchedulerKey(requestedPriority), container1.getContainer());
+ toSchedulerKey(requestedPriority), container1);
// Reserved container
Priority prio1 = Priority.newInstance(1);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index b2283fb..2245599 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -1204,9 +1204,9 @@ public class TestLeafQueue {
qb.finishApplication(app_0.getApplicationId(), user_0);
qb.finishApplication(app_2.getApplicationId(), user_1);
qb.releaseResource(clusterResource, app_0, Resource.newInstance(4*GB, 1),
- null, null);
+ "", null);
qb.releaseResource(clusterResource, app_2, Resource.newInstance(4*GB, 1),
- null, null);
+ "", null);
qb.setUserLimit(50);
qb.setUserLimitFactor(1);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
index 54b5047..636cb99 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -25,9 +25,11 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -47,11 +49,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestPartitionQueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -2032,10 +2037,9 @@ public class TestNodeLabelContainerAllocation {
assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB());
assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB());
- // The total memory tracked by QueueMetrics is 0GB for the default partition
CSQueue rootQueue = cs.getRootQueue();
- assertEquals(0*GB, rootQueue.getMetrics().getAvailableMB() +
- rootQueue.getMetrics().getAllocatedMB());
+ assertEquals(0 * GB, rootQueue.getMetrics().getAvailableMB()
+ + rootQueue.getMetrics().getAllocatedMB());
// Kill all apps in queue a
cs.killAllAppsInQueue("a");
@@ -2084,6 +2088,8 @@ public class TestNodeLabelContainerAllocation {
csConf.setCapacityByLabel(queueB, "x", 50);
csConf.setMaximumCapacityByLabel(queueB, "x", 50);
+ csConf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+
// set node -> label
mgr.addToCluserNodeLabels(
ImmutableSet.of(NodeLabel.newInstance("x", false)));
@@ -2102,6 +2108,54 @@ public class TestNodeLabelContainerAllocation {
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <no_label>
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+ SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
+ for (int i = 0; i < 50; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+ for (int i = 0; i < 50; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+ double delta = 0.0001;
+ CSQueue leafQueue = cs.getQueue("a");
+ CSQueue leafQueueB = cs.getQueue("b");
+ CSQueue rootQueue = cs.getRootQueue();
+ assertEquals(10 * GB, rootQueue.getMetrics().getAvailableMB(), delta);
+ assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
+ assertEquals(7.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta);
+
+ MetricsSystem ms = leafQueueB.getMetrics().getMetricsSystem();
+ QueueMetrics partXMetrics =
+ (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "x");
+ QueueMetrics partDefaultMetrics =
+ (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "");
+ QueueMetrics queueAMetrics =
+ (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a");
+ QueueMetrics queueBMetrics =
+ (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.b");
+ QueueMetrics queueAPartDefaultMetrics =
+ (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a");
+ QueueMetrics queueAPartXMetrics =
+ (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a");
+ QueueMetrics queueBPartDefaultMetrics =
+ (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.b");
+ QueueMetrics queueBPartXMetrics =
+ (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.b");
+ QueueMetrics rootMetrics =
+ (QueueMetrics) TestQueueMetrics.queueSource(ms, "root");
+ assertEquals(10 * GB, partXMetrics.getAvailableMB(), delta);
+ assertEquals(10 * GB, partDefaultMetrics.getAvailableMB(), delta);
+ assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
+ assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta);
+ assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta);
+ assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta);
+ assertEquals(10 * GB, rootMetrics.getAvailableMB(), delta);
+ assertEquals(2.5 * GB, queueAMetrics.getAvailableMB(), delta);
+ assertEquals(7.5 * GB, queueBMetrics.getAvailableMB(), delta);
+
// app1 -> a
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
@@ -2109,47 +2163,73 @@ public class TestNodeLabelContainerAllocation {
// app1 asks for 3 partition= containers
am1.allocate("*", 1 * GB, 3, new ArrayList<ContainerId>());
- // NM1 do 50 heartbeats
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-
- SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
+ for (int i = 0; i < 50; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
// app1 gets all resource in partition=x (non-exclusive)
Assert.assertEquals(3, schedulerNode1.getNumContainers());
-
SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
.getNodeReport(nm1.getNodeId());
Assert.assertEquals(3 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(7 * GB,
reportNm1.getAvailableResource().getMemorySize());
-
SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
.getNodeReport(nm2.getNodeId());
Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize());
Assert.assertEquals(9 * GB,
reportNm2.getAvailableResource().getMemorySize());
-
- LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
- // 3GB is used from label x quota. 1.5 GB is remaining from default label.
- // 2GB is remaining from label x.
- assertEquals(15 * GB / 10, leafQueue.getMetrics().getAvailableMB());
+ assertEquals(7 * GB, partXMetrics.getAvailableMB(), delta);
+ assertEquals(9 * GB, partDefaultMetrics.getAvailableMB(), delta);
+ assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
+ assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
+ assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta);
+ assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta);
+ assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
+ assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta);
+ assertEquals(1 * GB, queueAMetrics.getAllocatedMB(), delta);
+ assertEquals(1.5 * GB, queueAMetrics.getAvailableMB(), delta);
+ assertEquals(0 * GB, queueBMetrics.getAllocatedMB(), delta);
+ assertEquals(7.5 * GB, queueBMetrics.getAvailableMB(), delta);
+ assertEquals(0 * GB, queueAMetrics.getPendingMB(), delta);
+ assertEquals(0 * GB, queueAPartDefaultMetrics.getPendingMB(), delta);
+ assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta);
+ assertEquals(0 * GB, queueBPartDefaultMetrics.getPendingMB(), delta);
+ assertEquals(0 * GB, queueBPartXMetrics.getPendingMB(), delta);
+ assertEquals(1.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB());
+ assertEquals(3 * GB, partXMetrics.getAllocatedMB(), delta);
+ assertEquals(1 * GB, partDefaultMetrics.getAllocatedMB(), delta);
+
+ QueueMetrics partDefaultQueueAUserMetrics =
+ (QueueMetrics) TestPartitionQueueMetrics.userSource(ms, "", "user",
+ "root.a");
+ QueueMetrics partXQueueAUserMetrics =
+ (QueueMetrics) TestPartitionQueueMetrics.userSource(ms, "x", "user",
+ "root.a");
+ QueueMetrics queueAUserMetrics =
+ (QueueMetrics) TestQueueMetrics.userSource(ms, "root.a", "user");
+
+ assertEquals(2 * GB, queueAUserMetrics.getAvailableMB(), delta);
+ assertEquals(1 * GB, queueAUserMetrics.getAllocatedMB(), delta);
+ assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
+ assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
+ assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta);
+ assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
+ assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta);
+ assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta);
+ assertEquals(2 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
+ assertEquals(3 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta);
- // app1 asks for 1 default partition container
am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
- // NM2 do couple of heartbeats
- RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
- SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
- // app1 gets all resource in default partition
Assert.assertEquals(2, schedulerNode2.getNumContainers());
+ Assert.assertEquals(3, schedulerNode1.getNumContainers());
// 3GB is used from label x quota. 2GB used from default label.
// So 0.5 GB is remaining from default label.
@@ -2158,10 +2238,100 @@ public class TestNodeLabelContainerAllocation {
// The total memory tracked by QueueMetrics is 10GB
// for the default partition
- CSQueue rootQueue = cs.getRootQueue();
assertEquals(10*GB, rootQueue.getMetrics().getAvailableMB() +
rootQueue.getMetrics().getAllocatedMB());
+ assertEquals(0.5 * GB, queueAMetrics.getAvailableMB(), delta);
+ assertEquals(2 * GB, queueAMetrics.getAllocatedMB());
+ assertEquals(0.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
+ assertEquals(2 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
+ assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta);
+ assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
+ assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAvailableMB(),
+ delta);
+ assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta);
+ assertEquals(2 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
+ assertEquals(3 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta);
+ assertEquals(1 * GB, queueAUserMetrics.getAvailableMB(), delta);
+ assertEquals(2 * GB, queueAUserMetrics.getAllocatedMB(), delta);
+ assertEquals(7 * GB, partXMetrics.getAvailableMB(), delta);
+ assertEquals(3 * GB, partXMetrics.getAllocatedMB(), delta);
+ assertEquals(8 * GB, partDefaultMetrics.getAvailableMB(), delta);
+ assertEquals(2 * GB, partDefaultMetrics.getAllocatedMB(), delta);
+
+ // Pending Resources when containers are waiting on "default" partition
+ assertEquals(4 * GB, queueAMetrics.getPendingMB(), delta);
+ assertEquals(4 * GB, queueAPartDefaultMetrics.getPendingMB(), delta);
+ assertEquals(4 * GB, partDefaultQueueAUserMetrics.getPendingMB(),
+ delta);
+ assertEquals(4 * GB, queueAUserMetrics.getPendingMB(), delta);
+ assertEquals(4 * GB, partDefaultMetrics.getPendingMB(), delta);
+ assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta);
+ assertEquals(0 * GB, partXQueueAUserMetrics.getPendingMB(), delta);
+ assertEquals(0 * GB, partXMetrics.getPendingMB(), delta);
+
+ for (int i = 0; i < 50; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+ for (int i = 0; i < 50; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ assertEquals(0.5 * GB, queueAMetrics.getAvailableMB(), delta);
+ assertEquals(2 * GB, queueAMetrics.getAllocatedMB());
+ assertEquals(0.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
+ assertEquals(2 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
+ assertEquals(0 * GB, queueAPartXMetrics.getAvailableMB(), delta);
+ assertEquals(7 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
+ assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAvailableMB(),
+ delta);
+ assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta);
+ assertEquals(0 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
+ assertEquals(7 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta);
+ assertEquals(1 * GB, queueAUserMetrics.getAvailableMB(), delta);
+ assertEquals(2 * GB, queueAUserMetrics.getAllocatedMB(), delta);
+ assertEquals(3 * GB, partXMetrics.getAvailableMB(), delta);
+ assertEquals(7 * GB, partXMetrics.getAllocatedMB(), delta);
+ assertEquals(8 * GB, partDefaultMetrics.getAvailableMB(), delta);
+ assertEquals(2 * GB, partDefaultMetrics.getAllocatedMB(), delta);
+
+ // Pending Resources after containers has been assigned on "x" partition
+ assertEquals(0 * GB, queueAMetrics.getPendingMB(), delta);
+ assertEquals(0 * GB, queueAPartDefaultMetrics.getPendingMB(), delta);
+ assertEquals(0 * GB, partDefaultQueueAUserMetrics.getPendingMB(),
+ delta);
+ assertEquals(0 * GB, queueAUserMetrics.getPendingMB(), delta);
+ assertEquals(0 * GB, partDefaultMetrics.getPendingMB(), delta);
+ assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta);
+ assertEquals(0 * GB, partXQueueAUserMetrics.getPendingMB(), delta);
+ assertEquals(0 * GB, partXMetrics.getPendingMB(), delta);
+
+ rm1.killApp(app1.getApplicationId());
+ rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+
+ for (int i = 0; i < 50; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+ for (int i = 0; i < 50; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ assertEquals(10 * GB, rootQueue.getMetrics().getAvailableMB(), delta);
+ assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
+ assertEquals(7.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta);
+ assertEquals(2, queueAMetrics.getAggregateAllocatedContainers());
+ assertEquals(2, queueAMetrics.getAggegatedReleasedContainers());
+ assertEquals(2, queueAPartDefaultMetrics.getAggregateAllocatedContainers());
+ assertEquals(2, queueAPartDefaultMetrics.getAggegatedReleasedContainers());
+ assertEquals(7, partXMetrics.getAggregateAllocatedContainers());
+ assertEquals(2, partDefaultMetrics.getAggregateAllocatedContainers());
+ assertEquals(7, queueAPartXMetrics.getAggregateAllocatedContainers());
+ assertEquals(7, queueAPartXMetrics.getAggegatedReleasedContainers());
+ assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
+ assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta);
+ assertEquals(3 * GB, queueAUserMetrics.getAvailableMB(), delta);
+ assertEquals(3 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta);
+ assertEquals(5 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
rm1.close();
}
@@ -2278,8 +2448,8 @@ public class TestNodeLabelContainerAllocation {
// The total memory tracked by QueueMetrics is 12GB
// for the default partition
CSQueue rootQueue = cs.getRootQueue();
- assertEquals(12*GB, rootQueue.getMetrics().getAvailableMB() +
- rootQueue.getMetrics().getAllocatedMB());
+ assertEquals(12 * GB, rootQueue.getMetrics().getAvailableMB()
+ + rootQueue.getMetrics().getAllocatedMB());
// Kill all apps in queue a
cs.killAllAppsInQueue("a");
@@ -2292,6 +2462,193 @@ public class TestNodeLabelContainerAllocation {
}
@Test
+ public void testTwoLevelQueueMetricsWithLabels() throws Exception {
+
+ CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
+ this.conf);
+
+ // Define top-level queues
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[] {"a"});
+ csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+ final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
+ csConf.setCapacity(queueA, 100);
+ csConf.setAccessibleNodeLabels(queueA, toSet("x"));
+ csConf.setCapacityByLabel(queueA, "x", 100);
+ csConf.setMaximumCapacityByLabel(queueA, "x", 100);
+
+ csConf.setQueues(queueA, new String[] {"a1"});
+ final String queueA1 = queueA + ".a1";
+ csConf.setCapacity(queueA1, 100);
+
+ csConf.setAccessibleNodeLabels(queueA1, toSet("x"));
+ csConf.setCapacityByLabel(queueA1, "x", 100);
+ csConf.setMaximumCapacityByLabel(queueA1, "x", 100);
+
+ // set node -> label
+ // label x exclusivity is set to true
+ mgr.addToCluserNodeLabels(
+ ImmutableSet.of(NodeLabel.newInstance("x", true)));
+ mgr.addLabelsToNode(
+ ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
+ MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB); // label = <no_label>
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ ParentQueue leafQueueA = (ParentQueue) cs.getQueue("a");
+ LeafQueue leafQueueA1 = (LeafQueue) cs.getQueue("a1");
+ assertEquals(12 * GB, leafQueueA1.getMetrics().getAvailableMB());
+ assertEquals(0 * GB, leafQueueA1.getMetrics().getAllocatedMB());
+ MetricsSystem ms = leafQueueA1.getMetrics().getMetricsSystem();
+ QueueMetrics partXMetrics =
+ (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "x");
+ QueueMetrics partDefaultMetrics =
+ (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "");
+ QueueMetrics queueAPartDefaultMetrics =
+ (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a");
+ QueueMetrics queueAPartXMetrics =
+ (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a");
+ QueueMetrics queueA1PartDefaultMetrics =
+ (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a.a1");
+ QueueMetrics queueA1PartXMetrics =
+ (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a.a1");
+ QueueMetrics queueRootPartDefaultMetrics =
+ (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root");
+ QueueMetrics queueRootPartXMetrics =
+ (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root");
+ QueueMetrics queueAMetrics =
+ (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a");
+ QueueMetrics queueA1Metrics =
+ (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a.a1");
+ QueueMetrics queueRootMetrics =
+ (QueueMetrics) TestQueueMetrics.queueSource(ms, "root");
+ assertEquals(12 * GB, queueAMetrics.getAvailableMB());
+ assertEquals(12 * GB, queueA1Metrics.getAvailableMB());
+ assertEquals(12 * GB, queueRootMetrics.getAvailableMB());
+ assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB());
+ assertEquals(10 * GB, queueA1PartXMetrics.getAvailableMB());
+ assertEquals(10 * GB, queueAPartXMetrics.getAvailableMB());
+ assertEquals(10 * GB, queueRootPartXMetrics.getAvailableMB());
+ assertEquals(12 * GB, queueA1PartDefaultMetrics.getAvailableMB());
+ assertEquals(12 * GB, queueAPartDefaultMetrics.getAvailableMB());
+ assertEquals(12 * GB, queueRootPartDefaultMetrics.getAvailableMB());
+ assertEquals(10 * GB, partXMetrics.getAvailableMB());
+ assertEquals(12 * GB, partDefaultMetrics.getAvailableMB());
+
+ // app1 -> a
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "x");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // app1 asks for 5 partition=x containers
+ am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "x");
+ // NM1 do 50 heartbeats
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
+
+ for (int i = 0; i < 50; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+
+ // app1 gets all resource in partition=x
+ Assert.assertEquals(6, schedulerNode1.getNumContainers());
+
+ SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
+ .getNodeReport(nm1.getNodeId());
+ Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
+ Assert.assertEquals(4 * GB, reportNm1.getAvailableResource().getMemorySize());
+
+ SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
+ .getNodeReport(nm2.getNodeId());
+ Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize());
+ Assert.assertEquals(12 * GB,
+ reportNm2.getAvailableResource().getMemorySize());
+
+ assertEquals(0 * GB, queueAMetrics.getAllocatedMB());
+ assertEquals(0 * GB, queueA1Metrics.getAllocatedMB());
+ assertEquals(0 * GB, queueRootMetrics.getAllocatedMB());
+ assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
+ assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
+ assertEquals(6 * GB, queueA1PartXMetrics.getAllocatedMB());
+ assertEquals(6 * GB, queueAPartXMetrics.getAllocatedMB());
+ assertEquals(6 * GB, queueRootPartXMetrics.getAllocatedMB());
+ assertEquals(0 * GB, queueA1PartDefaultMetrics.getAllocatedMB());
+ assertEquals(0 * GB, queueAPartDefaultMetrics.getAllocatedMB());
+ assertEquals(0 * GB, queueRootPartDefaultMetrics.getAllocatedMB());
+ assertEquals(6 * GB, partXMetrics.getAllocatedMB());
+ assertEquals(0 * GB, partDefaultMetrics.getAllocatedMB());
+
+ // app2 -> a
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // app2 asks for 5 partition= containers
+ am2.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "");
+ // NM2 do 50 heartbeats
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
+
+ for (int i = 0; i < 50; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ // app1 gets all resource in partition=x
+ Assert.assertEquals(6, schedulerNode2.getNumContainers());
+
+ reportNm1 = rm1.getResourceScheduler().getNodeReport(nm1.getNodeId());
+ Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
+ Assert.assertEquals(4 * GB,
+ reportNm1.getAvailableResource().getMemorySize());
+
+ reportNm2 = rm1.getResourceScheduler().getNodeReport(nm2.getNodeId());
+ Assert.assertEquals(6 * GB, reportNm2.getUsedResource().getMemorySize());
+ Assert.assertEquals(6 * GB,
+ reportNm2.getAvailableResource().getMemorySize());
+
+ assertEquals(6 * GB, leafQueueA.getMetrics().getAvailableMB());
+ assertEquals(6 * GB, leafQueueA.getMetrics().getAllocatedMB());
+
+ // The total memory tracked by QueueMetrics is 12GB
+ // for the default partition
+ CSQueue rootQueue = cs.getRootQueue();
+ assertEquals(12 * GB, rootQueue.getMetrics().getAvailableMB()
+ + rootQueue.getMetrics().getAllocatedMB());
+
+ assertEquals(6 * GB, queueAMetrics.getAllocatedMB());
+ assertEquals(6 * GB, queueA1Metrics.getAllocatedMB());
+ assertEquals(6 * GB, queueRootMetrics.getAllocatedMB());
+ assertEquals(6 * GB, queueA1PartXMetrics.getAllocatedMB());
+ assertEquals(6 * GB, queueAPartXMetrics.getAllocatedMB());
+ assertEquals(6 * GB, queueRootPartXMetrics.getAllocatedMB());
+ assertEquals(6 * GB, queueA1PartDefaultMetrics.getAllocatedMB());
+ assertEquals(6 * GB, queueAPartDefaultMetrics.getAllocatedMB());
+ assertEquals(6 * GB, queueRootPartDefaultMetrics.getAllocatedMB());
+ assertEquals(6 * GB, partXMetrics.getAllocatedMB());
+ assertEquals(6 * GB, partDefaultMetrics.getAllocatedMB());
+
+ // Kill all apps in queue a
+ cs.killAllAppsInQueue("a1");
+ rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+ rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());
+ assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB());
+ assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores());
+ rm1.close();
+ }
+
+ @Test
public void testQueueMetricsWithLabelsDisableElasticity() throws Exception {
/**
* Test case: have a following queue structure:
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org